Merge branch 'new-schema'

This commit is contained in:
Scott Lamb 2017-01-01 22:59:49 -08:00
commit 586902d30f
18 changed files with 1726 additions and 468 deletions

View File

@ -17,7 +17,8 @@ support for motion detection, no authentication, and no config UI.
This is version 0.1, the initial release. Until version 1.0, there will be no
compatibility guarantees: configuration and storage formats may change from
version to version.
version to version. There is an [upgrade procedure](guide/schema.md) but it is
not for the faint of heart.
I hope to add features such as salient motion detection. It's way too early to
make promises, but it seems possible to build a full-featured
@ -211,11 +212,12 @@ each camera you insert using this method.
$ sudo -u moonfire-nvr sqlite3 ~moonfire-nvr/db/db
sqlite3> insert into camera (
...> uuid, short_name, description, host, username, password,
...> main_rtsp_path, sub_rtsp_path, retain_bytes) values (
...> main_rtsp_path, sub_rtsp_path, retain_bytes,
...> next_recording_id) values (
...> X'b47f48706d91414591cd6c931bf836b4', 'driveway',
...> 'Longer description of this camera', '192.168.1.101',
...> 'admin', '12345', '/Streaming/Channels/1',
...> '/Streaming/Channels/2', 104857600);
...> '/Streaming/Channels/2', 104857600, 0);
sqlite3> ^D
### Using automatic camera configuration inclusion with `prep.sh`
@ -228,29 +230,29 @@ for easy reading, and editing, and does not have to be altered in formatting,
but can if you wish and know what you are doing):
insert into camera (
uuid,
short_name, description,
host, username, password,
main_rtsp_path, sub_rtsp_path,
retain_bytes
)
values
(
X'1c944181b8074b8083eb579c8e194451',
'Front Left', 'Front Left Driveway',
'192.168.1.41',
'admin', 'secret',
'/Streaming/Channels/1', '/Streaming/Channels/2',
346870912000
),
(
X'da5921f493ac4279aafe68e69e174026',
'Front Right', 'Front Right Driveway',
'192.168.1.42',
'admin', 'secret',
'/Streaming/Channels/1', '/Streaming/Channels/2',
346870912000
);
uuid,
short_name, description,
host, username, password,
main_rtsp_path, sub_rtsp_path,
retain_bytes, next_recording_id
)
values
(
X'1c944181b8074b8083eb579c8e194451',
'Front Left', 'Front Left Driveway',
'192.168.1.41',
'admin', 'secret',
'/Streaming/Channels/1', '/Streaming/Channels/2',
346870912000, 0
),
(
X'da5921f493ac4279aafe68e69e174026',
'Front Right', 'Front Right Driveway',
'192.168.1.42',
'admin', 'secret',
'/Streaming/Channels/1', '/Streaming/Channels/2',
346870912000, 0
);
You'll still have to find the correct rtsp paths, usernames and passwords, and
set retained byte counts, as explained above.

View File

@ -125,31 +125,28 @@ Valid request parameters:
TODO(slamb): once we support annotations, should they be included in the same
URI or as a separate `/annotations`?
TODO(slamb): There might be some irregularity in the order if there are
overlapping recordings (such as if the server's clock jumped while running)
but I haven't thought about the details. In general, I'm not really sure how
to handle this case, other than ideally to keep recording stuff no matter what
and present some UI to help the user to fix it after the
fact.
In the property `recordings`, returns a list of recordings in arbitrary order.
Each recording object has the following properties:
In the property `recordings`, returns a list of recordings. Each recording
object has the following properties:
* `start_time_90k`
* `end_time_90k`
* `start_id`. The id of this recording, which can be used with `/view.mp4`
to retrieve its content.
* `end_id` (optional). If absent, this object describes a single recording.
If present, this indicates that recordings `start_id-end_id` (inclusive)
together are as described. Adjacent recordings from the same RTSP session
may be coalesced in this fashion to reduce the amount of redundant data
transferred.
* `start_time_90k`: the start time of the given recording. Note this may be
less than the requested `start_time_90k` if this recording was ongoing
at the requested time.
* `end_time_90k`: the end time of the given recording. Note this may be
greater than the requested `end_time_90k` if this recording was ongoing at
the requested time.
* `sample_file_bytes`
* `video_sample_entry_sha1`
* `video_sample_entry_width`
* `video_sample_entry_height`
* `video_samples`: the number of samples (aka frames) of video in this
recording.
* TODO: recording id(s)? interior split points for coalesced recordings?
Recordings may be coalesced if they are adjacent and have the same
`video_sample_entry_*` data. That is, if recording A spans times [t, u) and
recording B spans times [u, v), they may be returned as a single recording
AB spanning times [t, v). Arbitrarily many recordings may be coalesced in this
fashion.
Example request URI (with added whitespace between parameters):
@ -165,8 +162,9 @@ Example response:
{
"recordings": [
{
"end_time_90k": 130985466591817,
"start_id": 1,
"start_time_90k": 130985461191810,
"end_time_90k": 130985466591817,
"sample_file_bytes": 8405564,
"video_sample_entry_sha1": "81710c9c51a02cc95439caa8dd3bc12b77ffe767",
"video_sample_entry_width": 1280,
@ -184,18 +182,38 @@ Example response:
### `/camera/<uuid>/view.mp4`
A GET returns a .mp4 file, with an etag and support for range requests.
A GET returns a `.mp4` file, with an etag and support for range requests.
Expected query parameters:
* `start_time_90k`
* `end_time_90k`
* `ts`: should be set to `true` to request a subtitle track be added with
human-readable recording timestamps.
* TODO(slamb): possibly `overlap` to indicate what to do about segments of
recording with overlapping wall times. Values might include:
* `error` (return an HTTP error)
* `include_all` (include all, in order of the recording ids)
* `include_latest` (include only the latest by recording id for a
particular segment of time)
* TODO(slamb): gaps allowed or not? maybe a parameter for this also?
* `s` (one or more): a string of the form
`START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`. This specifies
recording segments to include. The produced `.mp4` file will be a
concatenation of the segments indicated by all `s` parameters. The ids to
retrieve are as returned by the `/recordings` URL. The optional start and
end times are in 90k units and relative to the start of the first specified
id. These can be used to clip the returned segments. Note they can be used
to skip over some ids entirely; this is allowed so that the caller doesn't
need to know the start time of each interior id.
* `ts` (optional): should be set to `true` to request a subtitle track be
added with human-readable recording timestamps.
Example request URI to retrieve all of recording id 1 from the given camera:
```
/camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1
```
Example request URI to retrieve all of recording ids 15 from the given camera,
with timestamp subtitles:
```
/camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1-5&ts=true
```
Example request URI to retrieve recording id 1, skipping its first 26
90,000ths of a second:
```
/camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1.26
```

BIN
design/time-frames.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

240
design/time.md Normal file
View File

@ -0,0 +1,240 @@
# Moonfire NVR Time Handling
Status: **draft**
> A man with a watch knows what time it is. A man with two watches is never
> sure.
>
> — Segal's law
## Objective
Maximize the likelihood Moonfire NVR's timestamps are useful.
The timestamp corresponding to a video frame should roughly match timestamps
from other sources:
* another video stream from the same camera. Given a video frame from the
"main" stream, a video frame from the "sub" stream with a similar
timestamp should have been recorded near the same time, and vice versa.
This minimizes confusion when switching between views of these streams,
and when viewing the "main" stream timestamps corresponding to a motion
event gathered from the less CPU-intensive "sub" stream.
* on-camera motion events from the same camera. If the video frame reflects
the motion event, its timestamp should be roughly within the event's
timespan.
* streams from other cameras. Recorded views from two cameras of the same
event should have similar timestamps.
* events noted by the owner of the system, neighbors, police, etc., for the
purpose of determining chronology, to the extent those persons use
accurate clocks.
Two segments of video recorded from the same stream of the same camera should
not overlap. This would make it impossible for a user interface to present a
simple timeline for accessing all recorded video.
Durations should be useful over short timescales:
* If an object's motion is recorded, distance travelled divided by the
duration of the frames over which this motion occurred should reflect the
object's average speed.
* Motion should appear smooth. There shouldn't be excessive frame-to-frame
jitter due to such factors as differences in encoding time or network
transmission.
This document describes an approach to achieving these goals when the
following statements are true:
* the NVR's system clock is within a second of correct on startup. (True
when NTP is functioning or when the system has a real-time clock battery
to preserve a previous correct time.)
* the NVR's system time does not experience forward or backward "step"
corrections (as opposed to frequency correction) during operation.
* the NVR's system time advances at roughly the correct frequency. (NTP
achieves this through frequency correction when operating correctly.)
* the cameras' clock frequencies are off by no more than 500 parts per
million (roughly 43 seconds per day).
* the cameras are geographically close to the NVR, so in most cases network
transmission time is under 50 ms. (Occasional delays are to be expected,
however.)
* the cameras issue at least one RTCP sender report per recording.
* the cameras are occasionally synchronized via NTP.
When one or more of those statements are false, the system should degrade
gracefully: preserve what properties it can, gather video anyway, and when
possible include sufficient metadata to assess trustworthiness.
Additionally, the system should not require manual configuration of camera
frequency corrections.
## Background
Time in a distributed system is notoriously tricky. [Falsehoods programmers
believe about
time](http://infiniteundo.com/post/25326999628/falsehoods-programmers-believe-about-time)
and [More falsehoods programmers believe about time; "wisdom of the crowd"
edition](http://infiniteundo.com/post/25509354022/more-falsehoods-programmers-believe-about-time)
give a taste of the problems encountered. These problems are found even in
datacenters with expensive, well-tested hardware and relatively reliable
network connections. Moonfire NVR is meant to run on an inexpensive
single-board computer and record video from budget, closed-source cameras,
so such problems are to be expected.
Moonfire NVR typically has access to the following sources of time
information:
* the local `CLOCK_REALTIME`. Ideally this is maintained by `ntpd`:
synchronized on startup, and frequency-corrected during operation. A
hardware real-time clock and battery keep accurate time across restarts
if the network is unavailable on startup. In the worst case, the system
has no real-time clock or no battery and a network connection is
unavailable. The time is far in the past on startup and is never
corrected or is corrected via a step while Moonfire NVR is running.
* the local `CLOCK_MONOTONIC`. This should be frequency-corrected by `ntpd`
and guaranteed to never experience "steps", though its reference point is
unspecified.
* the local `ntpd`, which can be used to determine if the system is
synchronized to NTP and quantify the precision of synchronization.
* each camera's clock. The ONVIF specification mandates cameras must
support synchronizing clocks via NTP, but in practice cameras appear to
use SNTP clients which simply step time periodically and provide no
interface to determine if the clock is currently synchronized. This
document's author owns several cameras with clocks that run roughly 100
ppm fast (9 seconds per day) and are adjusted via steps.
* the RTP timestamps from each of a camera's streams. As described in [RFC
3550 section 5.1](https://tools.ietf.org/html/rfc3550#section-5.1), these
are monotonically increasing with an unspecified reference point. They
can't be directly compared to other cameras or other streams from the
same camera. Emperically, budget cameras don't appear to do any frequency
correction on these timestamps.
* in some cases, RTCP sender reports, as described in [RFC 3550 section
6.4](https://tools.ietf.org/html/rfc3550#section-6.4). These correlate
RTP timestamps with the camera's real time clock. However, these are only
sent periodically, not necessarily at the beginning of the session.
Some cameras omit them entirely depending on firmware version, as noted
in [this forum post](http://www.cctvforum.com/viewtopic.php). Additionally,
Moonfire NVR currently uses ffmpeg's libavformat for RTSP protocol
handling; this library exposes these reports in a limited fashion.
The camera records video frames as in the diagram below:
![Video frame timeline](time-frames.png)
Each frame has an associated RTP timestamp. It's unclear from skimming RFC
3550 exactly what time this represents, but it must be some time after the
last frame and before the next frame. At a typical rate of 30 frames per
second, this timespan is short enough that this uncertainty won't be the
largest source of time error in the system. We'll assume arbitrarily that the
timestamp refers to the start of exposure.
RTP doesn't transmit the duration of each video frame; it must be calculated
from the timestamp of the following frame. This means that if a stream is
terminated, the final frame has unknown duration.
As described in [schema.md](schema.md), Moonfire NVR saves RTSP video streams
into roughly one-minute "recordings", with a fixed rotation offset after the
minute in the NVR's wall time.
## Overview
Moonfire NVR will use the RTP timestamps to calculate video frames' durations.
For the first segment of video, it will trust these completely. It will use
them and the NVR's wall clock time to establish the start time of the
recording. For following segments, it will slightly adjust durations to
compensate for difference between the frequencies of the camera and NVR
clock, trusting the latter to be accurate.
## Detailed design
On every frame of video, Moonfire NVR will get a timestamp from
`CLOCK_MONOTONIC`. On the first frame, it will additionally get a timestamp
from `CLOCK_REALTIME` and compute the difference. It uses these to compute a
monotonically increasing real time of receipt for every frame, called the
_local frame time_. Assuming the local clock is accurate, this time is an
upper bound on when the frame was generated. The difference is the sum of the
following items:
* H.264 encoding
* buffering on the camera (particularly when starting the stream—some
cameras apparently send frames that were captured before the RTSP session
was established)
* network transmission time
These values may produce some jitter, so the local frame time is not directly
used to calculate frame durations. Instead, they are primarily taken from
differences in RTP timestamps from one frame to the next. During the first
segment of video, these RTP timestamp differences are used directly, without
correcting for incorrect camera frequency. At the design limit of 500 ppm
camera frequency error, and an upper bound of two minutes of recording for the
initial segment (explained below), this causes a maximum of 60 milliseconds of
error.
The _local start time_ of a segment is calculated when ending it. It's defined
as the minimum for all frames of the local frame time minus the duration of
all previous frames. If there are many frames, this means neither initial
buffering nor spikes of delay in H.264 encoding or network transmission cause
the local start time to become inaccurate. The least delayed frame wins.
The first segment either ends with the RTSP session (due to error/shutdown) or
on rotation. In the former case, there may not be many samples to use in
calculating the local start time; accuracy may suffer but the system degrades
gracefully. Rotation doesn't happen until the second time the rotation offset
is passed, so rotation happens after 12 minutes rather than 01 minutes to
maximize accuracy.
The _start time_ of the first segment is its local start time. The start time
of following segments is the end time of the previous segment.
The duration of following segments is adjusted to compensate for camera
frequency error, assuming the NVR clock's frequency is more trustworthy. This
is done as follows. The _local duration_ of segment _i_ is calculated as the
local start time of segment _i+1_ minus the local start time of segment _i_.
The _cumulative error_ as of segment _i_ is defined as the local duration of
all previous segments minus the duration of all previous segments. The
duration of segment _i_ should be adjusted by up to 500 ppm to eliminate
cumulative error. (For a one-minute segment, this is 0.3 ms, or 27 90kHz units.)
This correction should be spread out across the segment to minimize jitter.
Each segment's local start time is also stored in the database as a delta to
the segment's start time. These stored values aren't for normal system
operation but may be handy in understanding and correcting errors.
## Caveats
There's no particular reason to believe this will produce perfectly matched
streams between cameras or even of main and sub streams within a camera.
If this is insufficient, there's an alternate calculation of start time that
could be used in some circumstances: the _camera start time_. The first RTCP
sender report could be used to correlate a RTP timestamp with the camera's
wall clock, and thus calculate the camera's time as of the first frame.
The _start time_ of the first segment could be either its local start time or
its camera start time, determined via the following rules:
1. if there is no camera start time (due to the lack of a RTCP sender
report), the local start time wins by default.
2. if the camera start time is before 2016-01-01 00:00:00 UTC, the local
start time wins.
3. if the local start time is before 2016-01-01 00:00:00 UTC, the camera
start time wins.
4. if the times differ by more than 5 seconds, the local start time wins.
5. otherwise, the camera start time wins.
These rules are a compromise. When a system starts up without NTP or a clock
battery, it typically reverts to a time in the distant past. Therefore times
before Moonfire NVR was written should be checked for and avoided. When both
systems have a believably recent timestamp, the local time is typically more
accurate, but the camera time allows a closer match between two streams of
the same camera.
This still doesn't completely solve the problem, and it's unclear it is even
better. When using camera start times, different cameras' streams may be
mismatched by up twice the 5-second threshold described above. This could even
happen for two streams within the same camera if a significant step happens
between their establishment. More frequent SNTP adjustments may help, so that
individual steps are less frequent. Or Moonfire NVR could attempt to address
this with more complexity: use sender reports of established RTSP sessions to
detect and compensate for these clock splits.
It's unclear if these additional mechanisms are desirable or worthwhile. The
simplest approach will be adopted initially and adapted as necessary.

View File

@ -15,33 +15,133 @@ The database schema includes a version number to quickly identify if a
the database is compatible with a particular version of the software. Some
software upgrades will require you to upgrade the database.
### Unversioned to version 0
Note that in general upgrades are one-way and backward-incompatible. That is,
you can't downgrade the database to the old version, and you can't run the old
software on the new database. To minimize the corresponding risk, you should
save a backup of the old SQLite database and verify the new software works in
read-only mode prior to deleting the old database.
Early versions of Moonfire NVR did not include the version information in the
schema. You can manually add this information to your schema using the
`sqlite3` commandline. This process is backward compatible, meaning that
software versions that accept an unversioned database will also accept a
version 0 database.
### Procedure
Version 0 makes two changes:
First ensure there is sufficient space available for four copies of the
SQLite database:
* schema versioning, as described above.
* adding a column (`video_sync_samples`) to a database index to speed up
certain operations.
# the primary copy, which will be upgraded
# a copy you create manually as a backup so that you can restore if you
discover a problem while running the new software against the upgraded
database in read-only mode. If disk space is tight, you can save this
to a different filesystem than the primary copy.
# internal copies made and destroyed by Moonfire NVR and SQLite during the
upgrade:
* during earlier steps, possibly duplicate copies of tables, which
may occupy space both in the main database and the journal
* during the final vacuum step, a complete database copy
If disk space is tight, and you are _very careful_, you can skip these
copies with the `--preset-journal=off --no-vacuum` arguments to
the updater. If you aren't confident in your ability to do this, *don't
do it*. If you are confident, take additional safety precautions anyway:
* double-check you have the full backup described above. Without the
journal any problems during the upgrade will corrupt your database
and you will need to restore.
* ensure you re-enable journalling via `pragma journal_mode = wal;`
before using the upgraded database, or any problems after the
upgrade will corrupt your database. The upgrade procedure should do
this automatically, but you will want to verify by hand that you are
no longer in the dangerous mode.
First ensure Moonfire NVR is not running; if you are using systemd with the
Next ensure Moonfire NVR is not running and does not automatically restart if
the system is rebooted during the upgrade. If you are using systemd with the
service name `moonfire-nvr`, you can do this as follows:
$ sudo systemctl stop moonfire-nvr
$ sudo systemctl disable moonfire-nvr
The service takes a moment to shut down; wait until the following command
reports that it is not running:
$ sudo systemctl status moonfire-nvr
Then use `sqlite3` to manually edit the database. The default path is
`/var/lib/moonfire-nvr/db/db`; if you've specified a different `--db_dir`,
use that directory with a suffix of `/db`.
Then back up your SQLite database. If you are using the default path, you can
do so as follows:
$ sudo -u moonfire-nvr cp /var/lib/moonfire-nvr/db/db{,.pre-upgrade}
By default, the upgrade command will reset the SQLite `journal_mode` to
`delete` prior to the upgrade. This works around a problem with
`journal_mode = wal` in older SQLite versions, as documented in [the SQLite
manual for write-ahead logging](https://www.sqlite.org/wal.html):
> WAL works best with smaller transactions. WAL does not work well for very
> large transactions. For transactions larger than about 100 megabytes,
> traditional rollback journal modes will likely be faster. For transactions
> in excess of a gigabyte, WAL mode may fail with an I/O or disk-full error.
> It is recommended that one of the rollback journal modes be used for
> transactions larger than a few dozen megabytes. Beginning with version
> 3.11.0 (2016-02-15), WAL mode works as efficiently with large transactions
> as does rollback mode.
Run the upgrade procedure using the new software binary (here referred to as
`new-moonfire-nvr`; if you are installing from source, you may find it as
`target/release/moonfire-nvr`).
$ sudo -u moonfire-nvr RUST_LOG=info new-moonfire-nvr --upgrade
Then run the system in read-only mode to verify correct operation:
$ sudo -u moonfire-nvr new-moonfire-nvr --read-only
Go to the web interface and ensure the system is operating correctly. If
you detect a problem now, you can copy the old database back over the new one.
If you detect a problem after enabling read-write operation, a restore will be
more complicated.
Then install the new software to the path expected by your systemd
configuration and start it up:
$ sudo install -m 755 new-moonfire-nvr /usr/local/bin/moonfire-nvr
$ sudo systemctl enable moonfire-nvr
$ sudo systemctl start moonfire-nvr
Hopefully your system is functioning correctly. If not, there are two options
for restore; neither are easy:
* go back to your old database. There will be two classes of problems:
* If the new system deleted any recordings, the old system will
incorrectly believe they are still present. You could wait until all
existing files are rotated away, or you could try to delete them
manually from the database.
* if the new system created any recordings, the old system will not
know about them and will not delete them. Your disk may become full.
You should find some way to discover these files and manually delete
them.
Once you're confident of correct operation, delete the unneeded backup:
$ sudo systemctl rm /var/lib/moonfire-nvr/db/db.pre-upgrade
### Unversioned to version 0
Early versions of Moonfire NVR (prior to 2016-12-20) did not include the
version information in the schema. You can manually add this information to
your schema using the `sqlite3` commandline. This process is backward
compatible, meaning that software versions that accept an unversioned database
will also accept a version 0 database.
Version 0 makes two changes:
* it adds schema versioning, as described above.
* it adds a column (`video_sync_samples`) to a database index to speed up
certain operations.
There's a special procedure for this upgrade. The good news is that a backup
is unnecessary; there's no risk with this procedure.
First ensure Moonfire NVR is not running as described in the general procedure
above.
Then use `sqlite3` to manually edit the database. The default
path is `/var/lib/moonfire-nvr/db/db`; if you've specified a different
`--db_dir`, use that directory with a suffix of `/db`.
$ sudo -u moonfire-nvr sqlite3 /var/lib/moonfire-nvr/db/db
sqlite3>
@ -74,6 +174,15 @@ create index recording_cover on recording (
commit transaction;
```
When you are done, you can restart the service:
When you are done, you can restart the service via `systemctl` and continue
using it with your existing or new version of Moonfire NVR.
$ sudo systemctl start moonfire-nvr
### Version 0 to version 1
Version 1 makes several changes to the recording tables and indices. These
changes allow overlapping recordings to be unambiguously listed and viewed.
They also reduce the amount of I/O; in one test of retrieving playback
indexes, the number of (mostly 1024-byte) read syscalls on the database
dropped from 605 to 39.
The general upgrade procedure applies to this upgrade.

View File

@ -30,29 +30,45 @@
//! Clock interface and implementations for testability.
use libc;
#[cfg(test)] use std::sync::Mutex;
use std::mem;
use std::thread;
use time;
use time::{Duration, Timespec};
/// Abstract interface to the system clock. This is for testability.
pub trait Clock : Sync {
/// Gets the current time.
fn get_time(&self) -> time::Timespec;
/// Abstract interface to the system clocks. This is for testability.
pub trait Clocks : Sync {
/// Gets the current time from `CLOCK_REALTIME`.
fn realtime(&self) -> Timespec;
/// Gets the current time from `CLOCK_MONOTONIC`.
fn monotonic(&self) -> Timespec;
/// Causes the current thread to sleep for the specified time.
fn sleep(&self, how_long: time::Duration);
fn sleep(&self, how_long: Duration);
}
/// Singleton "real" clock.
pub static REAL: RealClock = RealClock {};
/// Singleton "real" clocks.
pub static REAL: RealClocks = RealClocks {};
/// Real clock; see static `REAL` instance.
pub struct RealClock {}
/// Real clocks; see static `REAL` instance.
pub struct RealClocks {}
impl Clock for RealClock {
fn get_time(&self) -> time::Timespec { time::get_time() }
impl RealClocks {
fn get(&self, clock: libc::clockid_t) -> Timespec {
unsafe {
let mut ts = mem::uninitialized();
assert_eq!(0, libc::clock_gettime(clock, &mut ts));
Timespec::new(ts.tv_sec as i64, ts.tv_nsec as i32)
}
}
}
fn sleep(&self, how_long: time::Duration) {
impl Clocks for RealClocks {
fn realtime(&self) -> Timespec { self.get(libc::CLOCK_REALTIME) }
fn monotonic(&self) -> Timespec { self.get(libc::CLOCK_MONOTONIC) }
fn sleep(&self, how_long: Duration) {
match how_long.to_std() {
Ok(d) => thread::sleep(d),
Err(e) => warn!("Invalid duration {:?}: {}", how_long, e),
@ -62,20 +78,29 @@ impl Clock for RealClock {
/// Simulated clock for testing.
#[cfg(test)]
pub struct SimulatedClock(Mutex<time::Timespec>);
#[cfg(test)]
impl SimulatedClock {
pub fn new() -> SimulatedClock { SimulatedClock(Mutex::new(time::Timespec::new(0, 0))) }
pub struct SimulatedClocks {
boot: Timespec,
uptime: Mutex<Duration>,
}
#[cfg(test)]
impl Clock for SimulatedClock {
fn get_time(&self) -> time::Timespec { *self.0.lock().unwrap() }
impl SimulatedClocks {
pub fn new(boot: Timespec) -> SimulatedClocks {
SimulatedClocks {
boot: boot,
uptime: Mutex::new(Duration::seconds(0)),
}
}
}
#[cfg(test)]
impl Clocks for SimulatedClocks {
fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock().unwrap() }
fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock().unwrap() }
/// Advances the clock by the specified amount without actually sleeping.
fn sleep(&self, how_long: time::Duration) {
let mut l = self.0.lock().unwrap();
fn sleep(&self, how_long: Duration) {
let mut l = self.uptime.lock().unwrap();
*l = *l + how_long;
}
}

394
src/db.rs
View File

@ -49,7 +49,7 @@
//! and such to avoid database operations in these paths.
//!
//! * the `Transaction` interface allows callers to batch write operations to reduce latency and
//! SSD write samples.
//! SSD write cycles.
use error::{Error, ResultExt};
use fnv;
@ -70,17 +70,24 @@ use time;
use uuid::Uuid;
/// Expected schema version. See `guide/schema.md` for more information.
pub const EXPECTED_VERSION: i32 = 0;
pub const EXPECTED_VERSION: i32 = 1;
const GET_RECORDING_SQL: &'static str =
"select sample_file_uuid, video_index from recording where id = :id";
const GET_RECORDING_PLAYBACK_SQL: &'static str = r#"
select
sample_file_uuid,
video_index
from
recording_playback
where
composite_id = :composite_id
"#;
const DELETE_RESERVATION_SQL: &'static str =
"delete from reserved_sample_files where uuid = :uuid";
const INSERT_RESERVATION_SQL: &'static str = r#"
insert into reserved_sample_files (uuid, state)
values (:uuid, :state);
values (:uuid, :state)
"#;
/// Valid values for the `state` column in the `reserved_sample_files` table.
@ -96,38 +103,50 @@ enum ReservationState {
const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
insert into video_sample_entry (sha1, width, height, data)
values (:sha1, :width, :height, :data);
values (:sha1, :width, :height, :data)
"#;
const INSERT_RECORDING_SQL: &'static str = r#"
insert into recording (camera_id, sample_file_bytes, start_time_90k,
duration_90k, local_time_delta_90k, video_samples,
video_sync_samples, video_sample_entry_id,
sample_file_uuid, sample_file_sha1, video_index)
values (:camera_id, :sample_file_bytes, :start_time_90k,
:duration_90k, :local_time_delta_90k,
:video_samples, :video_sync_samples,
:video_sample_entry_id, :sample_file_uuid,
:sample_file_sha1, :video_index);
insert into recording (composite_id, camera_id, run_offset, flags, sample_file_bytes,
start_time_90k, duration_90k, local_time_delta_90k, video_samples,
video_sync_samples, video_sample_entry_id)
values (:composite_id, :camera_id, :run_offset, :flags, :sample_file_bytes,
:start_time_90k, :duration_90k, :local_time_delta_90k,
:video_samples, :video_sync_samples, :video_sample_entry_id)
"#;
const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#"
insert into recording_playback (composite_id, sample_file_uuid, sample_file_sha1, video_index)
values (:composite_id, :sample_file_uuid, :sample_file_sha1,
:video_index)
"#;
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
"update camera set next_recording_id = :next_recording_id where id = :camera_id";
const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#"
select
id,
sample_file_uuid,
start_time_90k,
duration_90k,
sample_file_bytes
recording.composite_id,
recording_playback.sample_file_uuid,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes
from
recording
join recording_playback on (recording.composite_id = recording_playback.composite_id)
where
camera_id = :camera_id
:start <= recording.composite_id and
recording.composite_id < :end
order by
start_time_90k
recording.composite_id
"#;
const DELETE_RECORDING_SQL: &'static str = r#"
delete from recording where id = :recording_id;
delete from recording where composite_id = :composite_id
"#;
const DELETE_RECORDING_PLAYBACK_SQL: &'static str = r#"
delete from recording_playback where composite_id = :composite_id
"#;
const CAMERA_MIN_START_SQL: &'static str = r#"
@ -137,7 +156,7 @@ const CAMERA_MIN_START_SQL: &'static str = r#"
recording
where
camera_id = :camera_id
order by start_time_90k limit 1;
order by start_time_90k limit 1
"#;
const CAMERA_MAX_START_SQL: &'static str = r#"
@ -151,6 +170,26 @@ const CAMERA_MAX_START_SQL: &'static str = r#"
order by start_time_90k desc;
"#;
const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#"
select
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
recording.video_samples,
recording.video_sync_samples,
recording.video_sample_entry_id
from
recording
where
:start <= composite_id and
composite_id < :end
order by
recording.composite_id
"#;
/// A concrete box derived from a ISO/IEC 14496-12 section 8.5.2 VisualSampleEntry box. Describes
/// the codec, width, height, etc.
#[derive(Debug)]
@ -162,45 +201,59 @@ pub struct VideoSampleEntry {
pub data: Vec<u8>,
}
/// A row used in `list_recordings`.
/// A row used in `list_recordings_by_time` and `list_recordings_by_id`.
#[derive(Debug)]
pub struct ListCameraRecordingsRow {
pub id: i64,
pub struct ListRecordingsRow {
pub start: recording::Time,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub camera_id: i32,
pub id: i32,
/// This is a recording::Duration, but a single recording's duration fits into an i32.
pub duration_90k: i32,
pub video_samples: i32,
pub video_sync_samples: i32,
pub sample_file_bytes: i32,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub run_offset: i32,
pub flags: i32,
}
/// A row used in `list_aggregated_recordings`.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ListAggregatedRecordingsRow {
pub range: Range<recording::Time>,
pub time: Range<recording::Time>,
pub ids: Range<i32>,
pub video_samples: i64,
pub video_sync_samples: i64,
pub sample_file_bytes: i64,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub camera_id: i32,
pub flags: i32,
pub run_start_id: i32,
}
/// Extra data about a recording, beyond what is returned by ListCameraRecordingsRow.
/// Retrieve with `get_recording`.
/// Select fields from the `recordings_playback` table. Retrieve with `get_recording_playback`.
#[derive(Debug)]
pub struct ExtraRecording {
pub struct RecordingPlayback {
pub sample_file_uuid: Uuid,
pub video_index: Vec<u8>
}
/// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`.
pub enum RecordingFlags {
TrailingZero = 1,
}
/// A recording to pass to `insert_recording`.
#[derive(Debug)]
pub struct RecordingToInsert {
pub camera_id: i32,
pub run_offset: i32,
pub flags: i32,
pub sample_file_bytes: i32,
pub time: Range<recording::Time>,
pub local_time: recording::Time,
pub local_time_delta: recording::Duration,
pub video_samples: i32,
pub video_sync_samples: i32,
pub video_sample_entry_id: i32,
@ -214,7 +267,7 @@ pub struct RecordingToInsert {
pub struct ListOldestSampleFilesRow {
pub uuid: Uuid,
pub camera_id: i32,
pub recording_id: i64,
pub recording_id: i32,
pub time: Range<recording::Time>,
pub sample_file_bytes: i32,
}
@ -285,6 +338,8 @@ pub struct Camera {
/// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day.
pub days: BTreeMap<CameraDayKey, CameraDayValue>,
next_recording_id: i32,
}
/// Adds `delta` to the day represented by `day` in the map `m`.
@ -431,8 +486,8 @@ struct State {
cameras_by_id: BTreeMap<i32, Camera>,
cameras_by_uuid: BTreeMap<Uuid, i32>,
video_sample_entries: BTreeMap<i32, Arc<VideoSampleEntry>>,
list_recordings_sql: String,
recording_cache: RefCell<LruCache<i64, Arc<ExtraRecording>, fnv::FnvBuildHasher>>,
list_recordings_by_time_sql: String,
playback_cache: RefCell<LruCache<i64, Arc<RecordingPlayback>, fnv::FnvBuildHasher>>,
}
/// A high-level transaction. This manages the SQLite transaction and the matching modification to
@ -443,10 +498,9 @@ pub struct Transaction<'a> {
tx: rusqlite::Transaction<'a>,
/// True if due to an earlier error the transaction must be rolled back rather than committed.
/// Insert and delete are two-part, requiring a delete from the `reserve_sample_files` table
/// and an insert to the `recording` table (or vice versa). If the latter half fails, the
/// former should be aborted as well. We could use savepoints (nested transactions) for this,
/// but for simplicity we just require the entire transaction be rolled back.
/// Insert and delete are multi-part. If later parts fail, earlier parts should be aborted as
/// well. We could use savepoints (nested transactions) for this, but for simplicity we just
/// require the entire transaction be rolled back.
must_rollback: bool,
/// Normally sample file uuids must be reserved prior to a recording being inserted.
@ -470,6 +524,13 @@ struct CameraModification {
/// Reset the Camera range to this value. This should be populated immediately prior to the
/// commit.
range: Option<Range<recording::Time>>,
/// Reset the next_recording_id to the specified value.
new_next_recording_id: Option<i32>,
}
fn composite_id(camera_id: i32, recording_id: i32) -> i64 {
(camera_id as i64) << 32 | recording_id as i64
}
impl<'a> Transaction<'a> {
@ -486,21 +547,28 @@ impl<'a> Transaction<'a> {
Ok(uuid)
}
/// Deletes the given recordings from the `recording` table.
/// Deletes the given recordings from the `recording` and `recording_playback` tables.
/// Note they are not fully removed from the database; the uuids are transferred to the
/// `reserved_sample_files` table. The caller should `unlink` the files, then remove the
/// reservation.
pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> {
let mut del = self.tx.prepare_cached(DELETE_RECORDING_SQL)?;
let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?;
let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?;
let mut insert = self.tx.prepare_cached(INSERT_RESERVATION_SQL)?;
self.check_must_rollback()?;
self.must_rollback = true;
for row in rows {
let changes = del.execute_named(&[(":recording_id", &row.recording_id)])?;
let composite_id = &composite_id(row.camera_id, row.recording_id);
let changes = del1.execute_named(&[(":composite_id", composite_id)])?;
if changes != 1 {
return Err(Error::new(format!("no such recording {} (camera {}, uuid {})",
row.recording_id, row.camera_id, row.uuid)));
return Err(Error::new(format!("no such recording {}/{} (uuid {})",
row.camera_id, row.recording_id, row.uuid)));
}
let changes = del2.execute_named(&[(":composite_id", composite_id)])?;
if changes != 1 {
return Err(Error::new(format!("no such recording_playback {}/{} (uuid {})",
row.camera_id, row.recording_id, row.uuid)));
}
let uuid = &row.uuid.as_bytes()[..];
insert.execute_named(&[
@ -546,9 +614,10 @@ impl<'a> Transaction<'a> {
}
// Unreserve the sample file uuid and insert the recording row.
if self.state.cameras_by_id.get_mut(&r.camera_id).is_none() {
return Err(Error::new(format!("no such camera id {}", r.camera_id)));
}
let cam = match self.state.cameras_by_id.get_mut(&r.camera_id) {
None => return Err(Error::new(format!("no such camera id {}", r.camera_id))),
Some(c) => c,
};
let uuid = &r.sample_file_uuid.as_bytes()[..];
{
let mut stmt = self.tx.prepare_cached(DELETE_RESERVATION_SQL)?;
@ -558,25 +627,40 @@ impl<'a> Transaction<'a> {
}
}
self.must_rollback = true;
let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id);
{
let recording_id = m.new_next_recording_id.unwrap_or(cam.next_recording_id);
let composite_id = composite_id(r.camera_id, recording_id);
let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?;
let sha1 = &r.sample_file_sha1[..];
stmt.execute_named(&[
(":composite_id", &composite_id),
(":camera_id", &(r.camera_id as i64)),
(":run_offset", &r.run_offset),
(":flags", &r.flags),
(":sample_file_bytes", &r.sample_file_bytes),
(":start_time_90k", &r.time.start.0),
(":duration_90k", &(r.time.end.0 - r.time.start.0)),
(":local_time_delta_90k", &(r.local_time.0 - r.time.start.0)),
(":local_time_delta_90k", &r.local_time_delta.0),
(":video_samples", &r.video_samples),
(":video_sync_samples", &r.video_sync_samples),
(":video_sample_entry_id", &r.video_sample_entry_id),
])?;
m.new_next_recording_id = Some(recording_id + 1);
let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?;
let sha1 = &r.sample_file_sha1[..];
stmt.execute_named(&[
(":composite_id", &composite_id),
(":sample_file_uuid", &uuid),
(":sample_file_sha1", &sha1),
(":video_index", &r.video_index),
])?;
let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?;
stmt.execute_named(&[
(":camera_id", &(r.camera_id as i64)),
(":next_recording_id", &m.new_next_recording_id),
])?;
}
self.must_rollback = false;
let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id);
m.duration += r.time.end - r.time.start;
m.sample_file_bytes += r.sample_file_bytes as i64;
adjust_days(r.time.clone(), 1, &mut m.days);
@ -597,6 +681,9 @@ impl<'a> Transaction<'a> {
adjust_day(*k, *v, &mut camera.days);
}
camera.range = m.range.clone();
if let Some(id) = m.new_next_recording_id {
camera.next_recording_id = id;
}
}
Ok(())
}
@ -618,6 +705,7 @@ impl<'a> Transaction<'a> {
sample_file_bytes: 0,
range: None,
days: BTreeMap::new(),
new_next_recording_id: None,
}
})
}
@ -697,32 +785,53 @@ impl LockedDatabase {
/// Lists the specified recordings in ascending order, passing them to a supplied function.
/// Given that the function is called with the database lock held, it should be quick.
pub fn list_recordings<F>(&self, camera_id: i32, desired_time: &Range<recording::Time>,
mut f: F) -> Result<(), Error>
where F: FnMut(ListCameraRecordingsRow) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_sql)?;
let mut rows = stmt.query_named(&[
pub fn list_recordings_by_time<F>(&self, camera_id: i32, desired_time: Range<recording::Time>,
f: F) -> Result<(), Error>
where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_by_time_sql)?;
let rows = stmt.query_named(&[
(":camera_id", &camera_id),
(":start_time_90k", &desired_time.start.0),
(":end_time_90k", &desired_time.end.0)])?;
self.list_recordings_inner(camera_id, rows, f)
}
pub fn list_recordings_by_id<F>(&self, camera_id: i32, desired_ids: Range<i32>, f: F)
-> Result<(), Error>
where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?;
let rows = stmt.query_named(&[
(":start", &composite_id(camera_id, desired_ids.start)),
(":end", &composite_id(camera_id, desired_ids.end)),
])?;
self.list_recordings_inner(camera_id, rows, f)
}
fn list_recordings_inner<F>(&self, camera_id: i32, mut rows: rusqlite::Rows, mut f: F)
-> Result<(), Error>
where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
while let Some(row) = rows.next() {
let row = row?;
let id = row.get_checked(0)?;
let vse_id = row.get_checked(6)?;
let id = row.get_checked::<_, i64>(0)? as i32; // drop top bits of composite_id.
let vse_id = row.get_checked(8)?;
let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) {
Some(v) => v,
None => {
return Err(Error::new(format!(
"recording {} references nonexistent video_sample_entry {}", id, vse_id)));
"recording {}/{} references nonexistent video_sample_entry {}",
camera_id, id, vse_id)));
},
};
let out = ListCameraRecordingsRow{
let out = ListRecordingsRow{
camera_id: camera_id,
id: id,
start: recording::Time(row.get_checked(1)?),
duration_90k: row.get_checked(2)?,
sample_file_bytes: row.get_checked(3)?,
video_samples: row.get_checked(4)?,
video_sync_samples: row.get_checked(5)?,
run_offset: row.get_checked(1)?,
flags: row.get_checked(2)?,
start: recording::Time(row.get_checked(3)?),
duration_90k: row.get_checked(4)?,
sample_file_bytes: row.get_checked(5)?,
video_samples: row.get_checked(6)?,
video_sync_samples: row.get_checked(7)?,
video_sample_entry: video_sample_entry.clone(),
};
f(out)?;
@ -730,73 +839,101 @@ impl LockedDatabase {
Ok(())
}
/// Convenience method which calls `list_recordings` and aggregates consecutive recordings.
/// Calls `list_recordings_by_time` and aggregates consecutive recordings.
/// Rows are given to the callback in arbitrary order. Callers which care about ordering
/// should do their own sorting.
pub fn list_aggregated_recordings<F>(&self, camera_id: i32,
desired_time: &Range<recording::Time>,
desired_time: Range<recording::Time>,
forced_split: recording::Duration,
mut f: F) -> Result<(), Error>
where F: FnMut(ListAggregatedRecordingsRow) -> Result<(), Error> {
let mut agg: Option<ListAggregatedRecordingsRow> = None;
self.list_recordings(camera_id, desired_time, |row| {
let needs_flush = if let Some(ref a) = agg {
let new_dur = a.range.end - a.range.start +
where F: FnMut(&ListAggregatedRecordingsRow) -> Result<(), Error> {
// Iterate, maintaining a map from a recording_id to the aggregated row for the latest
// batch of recordings from the run starting at that id. Runs can be split into multiple
// batches for a few reasons:
//
// * forced split (when exceeding a duration limit)
// * a missing id (one that was deleted out of order)
// * video_sample_entry mismatch (if the parameters changed during a RTSP session)
//
// This iteration works because in a run, the start_time+duration of recording id r
// is equal to the start_time of recording id r+1. Thus ascending times guarantees
// ascending ids within a run. (Different runs, however, can be arbitrarily interleaved if
// their timestamps overlap. Tracking all active runs prevents that interleaving from
// causing problems.)
let mut aggs: BTreeMap<i32, ListAggregatedRecordingsRow> = BTreeMap::new();
self.list_recordings_by_time(camera_id, desired_time, |row| {
let run_start_id = row.id - row.run_offset;
let needs_flush = if let Some(a) = aggs.get(&run_start_id) {
let new_dur = a.time.end - a.time.start +
recording::Duration(row.duration_90k as i64);
a.range.end != row.start ||
row.video_sample_entry.id != a.video_sample_entry.id || new_dur >= forced_split
a.ids.end != row.id || row.video_sample_entry.id != a.video_sample_entry.id ||
new_dur >= forced_split
} else {
false
};
if needs_flush {
let a = agg.take().expect("needs_flush when agg is none");
f(a)?;
let a = aggs.remove(&run_start_id).expect("needs_flush when agg is None");
f(&a)?;
}
match agg {
None => {
agg = Some(ListAggregatedRecordingsRow{
range: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
let need_insert = if let Some(ref mut a) = aggs.get_mut(&run_start_id) {
if a.time.end != row.start {
return Err(Error::new(format!(
"camera {} recording {} ends at {}; {} starts at {}; expected same",
camera_id, a.ids.end - 1, a.time.end, row.id, row.start)));
}
a.time.end.0 += row.duration_90k as i64;
a.ids.end = row.id + 1;
a.video_samples += row.video_samples as i64;
a.video_sync_samples += row.video_sync_samples as i64;
a.sample_file_bytes += row.sample_file_bytes as i64;
false
} else {
true
};
if need_insert {
aggs.insert(run_start_id, ListAggregatedRecordingsRow{
time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
ids: row.id .. row.id+1,
video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64,
sample_file_bytes: row.sample_file_bytes as i64,
video_sample_entry: row.video_sample_entry,
});
},
Some(ref mut a) => {
a.range.end.0 += row.duration_90k as i64;
a.video_samples += row.video_samples as i64;
a.video_sync_samples += row.video_sync_samples as i64;
a.sample_file_bytes += row.sample_file_bytes as i64;
}
camera_id: camera_id,
run_start_id: row.id - row.run_offset,
flags: row.flags,
});
};
Ok(())
})?;
if let Some(a) = agg {
for a in aggs.values() {
f(a)?;
}
Ok(())
}
/// Gets extra data about a single recording.
/// Gets a single `recording_playback` row.
/// This uses a LRU cache to reduce the number of retrievals from the database.
pub fn get_recording(&self, recording_id: i64)
-> Result<Arc<ExtraRecording>, Error> {
let mut cache = self.state.recording_cache.borrow_mut();
if let Some(r) = cache.get_mut(&recording_id) {
debug!("cache hit for recording {}", recording_id);
pub fn get_recording_playback(&self, camera_id: i32, recording_id: i32)
-> Result<Arc<RecordingPlayback>, Error> {
let composite_id = composite_id(camera_id, recording_id);
let mut cache = self.state.playback_cache.borrow_mut();
if let Some(r) = cache.get_mut(&composite_id) {
trace!("cache hit for recording {}/{}", camera_id, recording_id);
return Ok(r.clone());
}
debug!("cache miss for recording {}", recording_id);
let mut stmt = self.conn.prepare_cached(GET_RECORDING_SQL)?;
let mut rows = stmt.query_named(&[(":id", &recording_id)])?;
trace!("cache miss for recording {}/{}", camera_id, recording_id);
let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?;
let mut rows = stmt.query_named(&[(":composite_id", &composite_id)])?;
if let Some(row) = rows.next() {
let row = row?;
let r = Arc::new(ExtraRecording{
let r = Arc::new(RecordingPlayback{
sample_file_uuid: get_uuid(&row, 0)?,
video_index: row.get_checked(1)?,
});
cache.insert(recording_id, r.clone());
cache.insert(composite_id, r.clone());
return Ok(r);
}
Err(Error::new(format!("no such recording {}", recording_id)))
Err(Error::new(format!("no such recording {}/{}", camera_id, recording_id)))
}
/// Lists all reserved sample files.
@ -816,15 +953,19 @@ impl LockedDatabase {
pub fn list_oldest_sample_files<F>(&self, camera_id: i32, mut f: F) -> Result<(), Error>
where F: FnMut(ListOldestSampleFilesRow) -> bool {
let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?;
let mut rows = stmt.query_named(&[(":camera_id", &(camera_id as i64))])?;
let mut rows = stmt.query_named(&[
(":start", &composite_id(camera_id, 0)),
(":end", &composite_id(camera_id + 1, 0)),
])?;
while let Some(row) = rows.next() {
let row = row?;
let start = recording::Time(row.get_checked(2)?);
let duration = recording::Duration(row.get_checked(3)?);
let composite_id: i64 = row.get_checked(0)?;
let should_continue = f(ListOldestSampleFilesRow{
recording_id: row.get_checked(0)?,
recording_id: composite_id as i32,
camera_id: (composite_id >> 32) as i32,
uuid: get_uuid(&row, 1)?,
camera_id: camera_id,
time: start .. start + duration,
sample_file_bytes: row.get_checked(4)?,
});
@ -888,7 +1029,8 @@ impl LockedDatabase {
camera.password,
camera.main_rtsp_path,
camera.sub_rtsp_path,
camera.retain_bytes
camera.retain_bytes,
next_recording_id
from
camera;
"#)?;
@ -912,6 +1054,7 @@ impl LockedDatabase {
sample_file_bytes: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
next_recording_id: row.get_checked(10)?,
});
self.state.cameras_by_uuid.insert(uuid, id);
}
@ -970,9 +1113,11 @@ pub struct Database(Mutex<LockedDatabase>);
impl Database {
/// Creates the database from a caller-supplied SQLite connection.
pub fn new(conn: rusqlite::Connection) -> Result<Database, Error> {
let list_recordings_sql = format!(r#"
let list_recordings_by_time_sql = format!(r#"
select
recording.id,
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
@ -987,7 +1132,7 @@ impl Database {
recording.start_time_90k < :end_time_90k and
recording.start_time_90k + recording.duration_90k > :start_time_90k
order by
recording.start_time_90k
recording.composite_id
"#, recording::MAX_RECORDING_DURATION);
{
use std::error::Error as E;
@ -999,8 +1144,7 @@ impl Database {
\
If you are starting from an \
empty database, see README.md to complete the \
installation. If you are starting from
complete the schema. If you are starting from a database \
installation. If you are starting from a database \
that predates schema versioning, see guide/schema.md."
.to_owned()));
},
@ -1025,8 +1169,8 @@ impl Database {
cameras_by_id: BTreeMap::new(),
cameras_by_uuid: BTreeMap::new(),
video_sample_entries: BTreeMap::new(),
recording_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
list_recordings_sql: list_recordings_sql,
playback_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
list_recordings_by_time_sql: list_recordings_by_time_sql,
},
}));
{
@ -1078,9 +1222,9 @@ mod tests {
let uuid_bytes = &uuid.as_bytes()[..];
conn.execute_named(r#"
insert into camera (uuid, short_name, description, host, username, password,
main_rtsp_path, sub_rtsp_path, retain_bytes)
main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id)
values (:uuid, :short_name, :description, :host, :username, :password,
:main_rtsp_path, :sub_rtsp_path, :retain_bytes)
:main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id)
"#, &[
(":uuid", &uuid_bytes),
(":short_name", &short_name),
@ -1091,6 +1235,7 @@ mod tests {
(":main_rtsp_path", &"/main"),
(":sub_rtsp_path", &"/sub"),
(":retain_bytes", &42i64),
(":next_recording_id", &0i64),
]).unwrap();
conn.last_insert_rowid() as i32
}
@ -1121,7 +1266,7 @@ mod tests {
{
let db = db.lock();
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
db.list_recordings(camera_id, &all_time, |_row| {
db.list_recordings_by_time(camera_id, all_time, |_row| {
rows += 1;
Ok(())
}).unwrap();
@ -1152,7 +1297,7 @@ mod tests {
{
let db = db.lock();
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
db.list_recordings(camera_id, &all_time, |row| {
db.list_recordings_by_time(camera_id, all_time, |row| {
rows += 1;
recording_id = row.id;
assert_eq!(r.time,
@ -1176,7 +1321,8 @@ mod tests {
}).unwrap();
assert_eq!(1, rows);
// TODO: get_recording.
// TODO: list_aggregated_recordings.
// TODO: get_recording_playback.
}
fn assert_unsorted_eq<T>(mut a: Vec<T>, mut b: Vec<T>)
@ -1251,10 +1397,10 @@ mod tests {
fn test_version_too_old() {
testutil::init();
let c = setup_conn();
c.execute_batch("delete from version; insert into version values (-1, 0, '');").unwrap();
c.execute_batch("delete from version; insert into version values (0, 0, '');").unwrap();
let e = Database::new(c).unwrap_err();
assert!(e.description().starts_with(
"Database schema version -1 is too old (expected 0)"), "got: {:?}",
"Database schema version 0 is too old (expected 1)"), "got: {:?}",
e.description());
}
@ -1262,10 +1408,10 @@ mod tests {
fn test_version_too_new() {
testutil::init();
let c = setup_conn();
c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap();
c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap();
let e = Database::new(c).unwrap_err();
assert!(e.description().starts_with(
"Database schema version 1 is too new (expected 0)"), "got: {:?}", e.description());
"Database schema version 2 is too new (expected 1)"), "got: {:?}", e.description());
}
/// Basic test of running some queries on a fresh database.
@ -1310,8 +1456,10 @@ mod tests {
let recording = RecordingToInsert{
camera_id: camera_id,
sample_file_bytes: 42,
run_offset: 0,
flags: 0,
time: start .. start + recording::Duration(TIME_UNITS_PER_SEC),
local_time: start,
local_time_delta: recording::Duration(0),
video_samples: 1,
video_sync_samples: 1,
video_sample_entry_id: vse_id,

View File

@ -33,10 +33,11 @@
//! This includes opening files for serving, rotating away old files, and saving new files.
use db;
use error::Error;
use libc;
use recording;
use error::Error;
use openssl::crypto::hash;
use std::cmp;
use std::ffi;
use std::fs;
use std::io::{self, Write};
@ -120,9 +121,12 @@ impl SampleFileDir {
/// Note this doesn't wait for previous rotation to complete; it's assumed the sample file
/// directory has sufficient space for a couple recordings per camera in addition to the
/// cameras' total `retain_bytes`.
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, start: recording::Time,
local_start: recording::Time, camera_id: i32,
video_sample_entry_id: i32) -> Result<Writer<'a>, Error> {
///
/// The new recording will continue from `prev` if specified; this should be as returned from
/// a previous `close` call.
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, prev: Option<PreviousWriter>,
camera_id: i32, video_sample_entry_id: i32)
-> Result<Writer<'a>, Error> {
// Grab the next uuid. Typically one is cached—a sync has usually completed since the last
// writer was created, and syncs ensure `next_uuid` is filled while performing their
// transaction. But if not, perform an extra database transaction to reserve a new one.
@ -145,7 +149,7 @@ impl SampleFileDir {
return Err(e.into());
},
};
Writer::open(f, uuid, start, local_start, camera_id, video_sample_entry_id, channel)
Writer::open(f, uuid, prev, camera_id, video_sample_entry_id, channel)
}
/// Opens a sample file within this directory with the given flags and (if creating) mode.
@ -424,10 +428,22 @@ struct InnerWriter<'a> {
uuid: Uuid,
corrupt: bool,
hasher: hash::Hasher,
start_time: recording::Time,
local_time: recording::Time,
/// The end time of the previous segment in this run, if any.
prev_end: Option<recording::Time>,
/// The start time of this segment, based solely on examining the local clock after frames in
/// this segment were received. Frames can suffer from various kinds of delay (initial
/// buffering, encoding, and network transmission), so this time is set to far in the future on
/// construction, given a real value on the first packet, and decreased as less-delayed packets
/// are discovered. See design/time.md for details.
local_start: recording::Time,
adjuster: ClockAdjuster,
camera_id: i32,
video_sample_entry_id: i32,
run_offset: i32,
/// A sample which has been written to disk but not added to `index`. Index writes are one
/// sample behind disk writes because the duration of a sample is the difference between its
@ -437,17 +453,72 @@ struct InnerWriter<'a> {
unflushed_sample: Option<UnflushedSample>,
}
/// Adjusts durations given by the camera to correct its clock frequency error.
struct ClockAdjuster {
/// Every `every_minus_1 + 1` units, add `-ndir`.
/// Note i32::max_value() disables adjustment.
every_minus_1: i32,
/// Should be 1 or -1 (unless disabled).
ndir: i32,
/// Keeps accumulated difference from previous values.
cur: i32,
}
impl ClockAdjuster {
fn new(local_time_delta: Option<i64>) -> Self {
// Pick an adjustment rate to correct local_time_delta over the next minute (the
// desired duration of a single recording). Cap the rate at 500 ppm (which corrects
// 2,700/90,000ths of a second over a minute) to prevent noticeably speeding up or slowing
// down playback.
let (every, ndir) = match local_time_delta {
None | Some(0) => (i32::max_value(), 0),
Some(d) if d <= -2700 => (2000, 1),
Some(d) if d >= 2700 => (2000, -1),
Some(d) if d < -60 => ((60 * 90000) / -(d as i32), 1),
Some(d) => ((60 * 90000) / (d as i32), -1),
};
ClockAdjuster{
every_minus_1: every - 1,
ndir: ndir,
cur: 0,
}
}
fn adjust(&mut self, mut val: i32) -> i32 {
self.cur += val;
// The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't
// cause a duration of 1 to become a duration of 0. It has no effect when increasing
// durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't
// be newly > self.every_minus_1.)
while self.cur > self.every_minus_1 && val > self.ndir {
val -= self.ndir;
self.cur -= self.every_minus_1 + 1;
}
val
}
}
struct UnflushedSample {
local_time: recording::Time,
pts_90k: i64,
len: i32,
is_key: bool,
}
#[derive(Copy, Clone)]
pub struct PreviousWriter {
end_time: recording::Time,
local_time_delta: recording::Duration,
run_offset: i32,
}
impl<'a> Writer<'a> {
/// Opens the writer; for use by `SampleFileDir` (which should supply `f`).
fn open(f: fs::File, uuid: Uuid, start_time: recording::Time, local_time: recording::Time,
camera_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel)
-> Result<Self, Error> {
fn open(f: fs::File, uuid: Uuid, prev: Option<PreviousWriter>, camera_id: i32,
video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result<Self, Error> {
Ok(Writer(Some(InnerWriter{
syncer_channel: syncer_channel,
f: f,
@ -455,19 +526,25 @@ impl<'a> Writer<'a> {
uuid: uuid,
corrupt: false,
hasher: hash::Hasher::new(hash::Type::SHA1)?,
start_time: start_time,
local_time: local_time,
prev_end: prev.map(|p| p.end_time),
local_start: recording::Time(i64::max_value()),
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
camera_id: camera_id,
video_sample_entry_id: video_sample_entry_id,
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
unflushed_sample: None,
})))
}
pub fn write(&mut self, pkt: &[u8], pts_90k: i64, is_key: bool) -> Result<(), Error> {
/// Writes a new frame to this segment.
/// `local_time` should be the local clock's time as of when this packet was received.
pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64,
is_key: bool) -> Result<(), Error> {
let w = self.0.as_mut().unwrap();
if let Some(unflushed) = w.unflushed_sample.take() {
let duration = (pts_90k - unflushed.pts_90k) as i32;
let duration = w.adjuster.adjust((pts_90k - unflushed.pts_90k) as i32);
w.index.add_sample(duration, unflushed.len, unflushed.is_key);
w.extend_local_start(unflushed.local_time);
}
let mut remaining = pkt;
while !remaining.is_empty() {
@ -488,6 +565,7 @@ impl<'a> Writer<'a> {
remaining = &remaining[written..];
}
w.unflushed_sample = Some(UnflushedSample{
local_time: local_time,
pts_90k: pts_90k,
len: pkt.len() as i32,
is_key: is_key});
@ -498,41 +576,57 @@ impl<'a> Writer<'a> {
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample.
pub fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> {
pub fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> {
self.0.take().unwrap().close(next_pts)
}
}
impl<'a> InnerWriter<'a> {
fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> {
fn extend_local_start(&mut self, pkt_local_time: recording::Time) {
let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64);
self.local_start = cmp::min(self.local_start, new);
}
fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> {
if self.corrupt {
self.syncer_channel.async_abandon_recording(self.uuid);
return Err(Error::new(format!("recording {} is corrupt", self.uuid)));
}
if let Some(unflushed) = self.unflushed_sample.take() {
let duration = match next_pts {
None => 0,
Some(p) => (p - unflushed.pts_90k) as i32,
};
self.index.add_sample(duration, unflushed.len, unflushed.is_key);
}
let unflushed =
self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?;
let duration = self.adjuster.adjust(match next_pts {
None => 0,
Some(p) => (p - unflushed.pts_90k) as i32,
});
self.index.add_sample(duration, unflushed.len, unflushed.is_key);
self.extend_local_start(unflushed.local_time);
let mut sha1_bytes = [0u8; 20];
sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]);
let end = self.start_time + recording::Duration(self.index.total_duration_90k as i64);
let start = self.prev_end.unwrap_or(self.local_start);
let end = start + recording::Duration(self.index.total_duration_90k as i64);
let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 }
else { 0 };
let local_start_delta = self.local_start - start;
let recording = db::RecordingToInsert{
camera_id: self.camera_id,
sample_file_bytes: self.index.sample_file_bytes,
time: self.start_time .. end,
local_time: self.local_time,
time: start .. end,
local_time_delta: local_start_delta,
video_samples: self.index.video_samples,
video_sync_samples: self.index.video_sync_samples,
video_sample_entry_id: self.video_sample_entry_id,
sample_file_uuid: self.uuid,
video_index: self.index.video_index,
sample_file_sha1: sha1_bytes,
run_offset: self.run_offset,
flags: flags,
};
self.syncer_channel.async_save_recording(recording, self.f);
Ok(end)
Ok(PreviousWriter{
end_time: end,
local_time_delta: local_start_delta,
run_offset: self.run_offset,
})
}
}
@ -546,3 +640,48 @@ impl<'a> Drop for Writer<'a> {
}
}
}
#[cfg(test)]
mod tests {
use super::ClockAdjuster;
use testutil;
#[test]
fn adjust() {
testutil::init();
// no-ops.
let mut a = ClockAdjuster::new(None);
for _ in 0..1800 {
assert_eq!(3000, a.adjust(3000));
}
a = ClockAdjuster::new(Some(0));
for _ in 0..1800 {
assert_eq!(3000, a.adjust(3000));
}
// typical, 100 ppm adjustment.
a = ClockAdjuster::new(Some(-540));
let mut total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 2999 || new == 3000);
total += new;
}
let expected = 1800*3000 - 540;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
// capped at 500 ppm (change of 2,700/90,000ths over 1 minute).
a = ClockAdjuster::new(Some(-1_000_000));
total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 2998 || new == 2999, "new={}", new);
total += new;
}
let expected = 1800*3000 - 2700;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
}
}

View File

@ -40,6 +40,7 @@ use serde_json;
use std::boxed::Box;
use std::convert::From;
use std::error;
use std::error::Error as E;
use std::fmt;
use std::io;
use std::result;
@ -92,56 +93,49 @@ impl fmt::Display for Error {
impl From<rusqlite::Error> for Error {
fn from(err: rusqlite::Error) -> Self {
use std::error::{Error as E};
Error{description: String::from(err.description()),
cause: Some(Box::new(err))}
Error{description: String::from(err.description()), cause: Some(Box::new(err))}
}
}
impl From<fmt::Error> for Error {
fn from(err: fmt::Error) -> Self {
Error{description: String::from(err.description()), cause: Some(Box::new(err))}
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self {
use std::error::{Error as E};
Error{description: String::from(err.description()),
cause: Some(Box::new(err))}
Error{description: String::from(err.description()), cause: Some(Box::new(err))}
}
}
impl From<time::ParseError> for Error {
fn from(err: time::ParseError) -> Self {
use std::error::{Error as E};
Error{description: String::from(err.description()),
cause: Some(Box::new(err))}
Error{description: String::from(err.description()), cause: Some(Box::new(err))}
}
}
impl From<num::ParseIntError> for Error {
fn from(err: num::ParseIntError) -> Self {
use std::error::{Error as E};
Error{description: err.description().to_owned(),
cause: Some(Box::new(err))}
Error{description: err.description().to_owned(), cause: Some(Box::new(err))}
}
}
impl From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Self {
use std::error::{Error as E};
Error{description: format!("{} ({})", err.description(), err),
cause: Some(Box::new(err))}
Error{description: format!("{} ({})", err.description(), err), cause: Some(Box::new(err))}
}
}
impl From<ffmpeg::Error> for Error {
fn from(err: ffmpeg::Error) -> Self {
use std::error::{Error as E};
Error{description: format!("{} ({})", err.description(), err),
cause: Some(Box::new(err))}
Error{description: format!("{} ({})", err.description(), err), cause: Some(Box::new(err))}
}
}
impl From<uuid::ParseError> for Error {
fn from(_: uuid::ParseError) -> Self {
Error{description: String::from("UUID parse error"),
cause: None}
Error{description: String::from("UUID parse error"), cause: None}
}
}

View File

@ -81,6 +81,7 @@ mod stream;
mod streamer;
mod strutil;
#[cfg(test)] mod testutil;
mod upgrade;
mod web;
/// Commandline usage string. This is in the particular format expected by the `docopt` crate.
@ -88,20 +89,30 @@ mod web;
/// allowed commandline arguments and their defaults.
const USAGE: &'static str = "
Usage: moonfire-nvr [options]
moonfire-nvr --upgrade [options]
moonfire-nvr (--help | --version)
Options:
-h, --help Show this message.
--version Show the version of moonfire-nvr.
--db-dir DIR Set the directory holding the SQLite3 index database.
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir DIR Set the directory holding video data.
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
--http-addr ADDR Set the bind address for the unencrypted HTTP server.
--http-addr=ADDR Set the bind address for the unencrypted HTTP server.
[default: 0.0.0.0:8080]
--read-only Forces read-only mode / disables recording.
--preset-journal=MODE With --upgrade, resets the SQLite journal_mode to
the specified mode prior to the upgrade. The default,
delete, is recommended. off is very dangerous but
may be desirable in some circumstances. See
guide/schema.md for more information. The journal
mode will be reset to wal after the upgrade.
[default: delete]
--no-vacuum With --upgrade, skips the normal post-upgrade vacuum
operation.
";
/// Commandline arguments corresponding to `USAGE`; automatically filled by the `docopt` crate.
@ -111,9 +122,18 @@ struct Args {
flag_sample_file_dir: String,
flag_http_addr: String,
flag_read_only: bool,
flag_upgrade: bool,
flag_no_vacuum: bool,
flag_preset_journal: String,
}
fn main() {
// Parse commandline arguments.
let version = "Moonfire NVR 0.1.0".to_owned();
let args: Args = docopt::Docopt::new(USAGE)
.and_then(|d| d.version(Some(version)).decode())
.unwrap_or_else(|e| e.exit());
// Watch for termination signals.
// This must be started before any threads are spawned (such as the async logger thread) so
// that signals will be blocked in all threads.
@ -124,12 +144,6 @@ fn main() {
let drain = slog_envlogger::new(drain);
slog_stdlog::set_logger(slog::Logger::root(drain.ignore_err(), None)).unwrap();
// Parse commandline arguments.
let version = "Moonfire NVR 0.1.0".to_owned();
let args: Args = docopt::Docopt::new(USAGE)
.and_then(|d| d.version(Some(version)).decode())
.unwrap_or_else(|e| e.exit());
// Open the database and populate cached state.
let db_dir = dir::Fd::open(&args.flag_db_dir).unwrap();
db_dir.lock(if args.flag_read_only { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB)
@ -144,6 +158,15 @@ fn main() {
// rusqlite::Connection is not Sync, so there's no reason to tell SQLite3 to use the
// serialized threading mode.
rusqlite::SQLITE_OPEN_NO_MUTEX).unwrap();
if args.flag_upgrade {
upgrade::run(conn, &args.flag_preset_journal, args.flag_no_vacuum).unwrap();
} else {
run(args, conn, &signal);
}
}
fn run(args: Args, conn: rusqlite::Connection, signal: &chan::Receiver<chan_signal::Signal>) {
let db = Arc::new(db::Database::new(conn).unwrap());
let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap();
info!("Database is loaded.");
@ -158,7 +181,7 @@ fn main() {
let env = streamer::Environment{
db: &db,
dir: &dir,
clock: &clock::REAL,
clocks: &clock::REAL,
opener: &*stream::FFMPEG,
shutdown: &shutdown,
};

View File

@ -543,11 +543,16 @@ impl Mp4FileBuilder {
self.segments.reserve(additional);
}
pub fn len(&self) -> usize { self.segments.len() }
/// Appends a segment for (a subset of) the given recording.
pub fn append(&mut self, db: &MutexGuard<db::LockedDatabase>, row: db::ListCameraRecordingsRow,
pub fn append(&mut self, db: &MutexGuard<db::LockedDatabase>, row: db::ListRecordingsRow,
rel_range_90k: Range<i32>) -> Result<()> {
if let Some(prev) = self.segments.last() {
if prev.s.have_trailing_zero {
return Err(Error::new(format!(
"unable to append recording {}/{} after recording {}/{} with trailing zero",
row.camera_id, row.id, prev.s.camera_id, prev.s.recording_id)));
}
}
self.segments.push(Mp4Segment{
s: recording::Segment::new(db, &row, rel_range_90k)?,
index: RefCell::new(None),
@ -591,7 +596,8 @@ impl Mp4FileBuilder {
// Update the etag to reflect this segment.
let mut data = [0_u8; 24];
let mut cursor = io::Cursor::new(&mut data[..]);
cursor.write_i64::<BigEndian>(s.s.id)?;
cursor.write_i32::<BigEndian>(s.s.camera_id)?;
cursor.write_i32::<BigEndian>(s.s.recording_id)?;
cursor.write_i64::<BigEndian>(s.s.start.0)?;
cursor.write_i32::<BigEndian>(d.start)?;
cursor.write_i32::<BigEndian>(d.end)?;
@ -1129,7 +1135,8 @@ impl Mp4File {
fn write_video_sample_data(&self, i: usize, r: Range<u64>, out: &mut io::Write) -> Result<()> {
let s = &self.segments[i];
let f = self.dir.open_sample_file(self.db.lock().get_recording(s.s.id)?.sample_file_uuid)?;
let rec = self.db.lock().get_recording_playback(s.s.camera_id, s.s.recording_id)?;
let f = self.dir.open_sample_file(rec.sample_file_uuid)?;
mmapfile::MmapFileSlice::new(f, s.s.sample_file_range()).write_to(r, out)
}
@ -1180,8 +1187,8 @@ mod tests {
use byteorder::{BigEndian, ByteOrder};
use db;
use dir;
use ffmpeg;
use error::Error;
use ffmpeg;
#[cfg(nightly)] use hyper;
use hyper::header;
use openssl::crypto::hash;
@ -1217,10 +1224,10 @@ mod tests {
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
/// Returns the SHA-1 digest of the given `Resource`.
fn digest(r: &http_entity::Entity<Error>) -> Vec<u8> {
/// Returns the SHA-1 digest of the given `Entity`.
fn digest(e: &http_entity::Entity<Error>) -> Vec<u8> {
let mut sha1 = Sha1::new();
r.write_to(0 .. r.len(), &mut sha1).unwrap();
e.write_to(0 .. e.len(), &mut sha1).unwrap();
sha1.finish()
}
@ -1401,7 +1408,7 @@ mod tests {
let extra_data = input.get_extra_data().unwrap();
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(
extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap();
let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME,
let mut output = db.dir.create_writer(&db.syncer_channel, None,
TEST_CAMERA_ID, video_sample_entry_id).unwrap();
// end_pts is the pts of the end of the most recent frame (start + duration).
@ -1410,15 +1417,20 @@ mod tests {
// To write the final packet of this sample .mp4 with a full duration, we need to fake a
// next packet's pts from the ffmpeg-supplied duration.
let mut end_pts = None;
let mut frame_time = START_TIME;
loop {
let pkt = match input.get_next() {
Ok(p) => p,
Err(ffmpeg::Error::Eof) => { break; },
Err(e) => { panic!("unexpected input error: {}", e); },
};
output.write(pkt.data().expect("packet without data"), pkt.pts().unwrap(),
let pts = pkt.pts().unwrap();
frame_time += recording::Duration(pkt.duration());
output.write(pkt.data().expect("packet without data"), frame_time, pts,
pkt.is_key()).unwrap();
end_pts = Some(pkt.pts().unwrap() + pkt.duration());
end_pts = Some(pts + pkt.duration());
}
output.close(end_pts).unwrap();
db.syncer_channel.flush();
@ -1435,6 +1447,7 @@ mod tests {
let mut recording = db::RecordingToInsert{
camera_id: TEST_CAMERA_ID,
sample_file_bytes: 30104460,
flags: 0,
time: START_TIME .. (START_TIME + DURATION),
local_time: START_TIME,
video_samples: 1800,
@ -1443,6 +1456,7 @@ mod tests {
sample_file_uuid: Uuid::nil(),
video_index: data,
sample_file_sha1: [0; 20],
run_index: 0,
};
let mut tx = db.tx().unwrap();
tx.bypass_reservation_for_testing = true;
@ -1451,6 +1465,7 @@ mod tests {
recording.time.start += DURATION;
recording.local_time += DURATION;
recording.time.end += DURATION;
recording.run_index += 1;
}
tx.commit().unwrap();
}
@ -1462,7 +1477,7 @@ mod tests {
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
{
let db = db.lock();
db.list_recordings(TEST_CAMERA_ID, &all_time, |r| {
db.list_recordings_by_time(TEST_CAMERA_ID, all_time, |r| {
let d = r.duration_90k;
assert!(skip_90k + shorten_90k < d);
builder.append(&db, r, skip_90k .. d - shorten_90k).unwrap();
@ -1658,7 +1673,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("1e5331e8371bd97ac3158b3a86494abc87cdc70e", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "3c48af4dbce2024db07f27a00789b6af774a8c89";
const EXPECTED_ETAG: &'static str = "908ae8ac303f66f2f4a1f8f52dba8f6ea9fdb442";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();
@ -1678,7 +1693,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("de382684a471f178e4e3a163762711b0653bfd83", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "c24d7af372e5d8f66f4feb6e3a5cd43828392371";
const EXPECTED_ETAG: &'static str = "e21c6a6dfede1081db3701cc595ec267c43c2bff";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();
@ -1698,7 +1713,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("685e026af44204bc9cc52115c5e17058e9fb7c70", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "870e2b3cfef4a988951344b32e53af0d4496894d";
const EXPECTED_ETAG: &'static str = "1d5c5980f6ba08a4dd52dfd785667d42cdb16992";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();
@ -1718,7 +1733,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("e0d28ddf08e24575a82657b1ce0b2da73f32fd88", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "71c329188a2cd175c8d61492a9789e242af06c05";
const EXPECTED_ETAG: &'static str = "555de64b39615e1a1cbe5bdd565ff197f5f126c5";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();

View File

@ -335,12 +335,15 @@ impl SampleIndexEncoder {
append_varint32((zigzag32(duration_delta) << 1) | (is_key as u32), &mut self.video_index);
append_varint32(zigzag32(bytes_delta), &mut self.video_index);
}
pub fn has_trailing_zero(&self) -> bool { self.prev_duration_90k == 0 }
}
/// A segment represents a view of some or all of a single recording, starting from a key frame.
/// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4.
pub struct Segment {
pub id: i64,
pub camera_id: i32,
pub recording_id: i32,
pub start: Time,
begin: SampleIndexIterator,
pub file_end: i32,
@ -349,6 +352,7 @@ pub struct Segment {
pub frames: i32,
pub key_frames: i32,
pub video_sample_entry_id: i32,
pub have_trailing_zero: bool,
}
impl Segment {
@ -360,10 +364,11 @@ impl Segment {
/// undesired portion.) It will end at the first frame after the desired range (unless the
/// desired range extends beyond the recording).
pub fn new(db: &MutexGuard<db::LockedDatabase>,
recording: &db::ListCameraRecordingsRow,
recording: &db::ListRecordingsRow,
desired_range_90k: Range<i32>) -> Result<Segment, Error> {
let mut self_ = Segment{
id: recording.id,
camera_id: recording.camera_id,
recording_id: recording.id,
start: recording.start,
begin: SampleIndexIterator::new(),
file_end: recording.sample_file_bytes,
@ -372,6 +377,7 @@ impl Segment {
frames: recording.video_samples,
key_frames: recording.video_sync_samples,
video_sample_entry_id: recording.video_sample_entry.id,
have_trailing_zero: (recording.flags & db::RecordingFlags::TrailingZero as i32) != 0,
};
if self_.desired_range_90k.start > self_.desired_range_90k.end ||
@ -388,8 +394,8 @@ impl Segment {
}
// Slow path. Need to iterate through the index.
let extra = db.get_recording(self_.id)?;
let data = &(&extra).video_index;
let playback = db.get_recording_playback(self_.camera_id, self_.recording_id)?;
let data = &(&playback).video_index;
let mut it = SampleIndexIterator::new();
if !it.next(data)? {
return Err(Error{description: String::from("no index"),
@ -429,6 +435,7 @@ impl Segment {
}
self_.file_end = it.pos;
self_.actual_end_90k = it.start_90k;
self_.have_trailing_zero = it.duration_90k == 0;
Ok(self_)
}
@ -443,38 +450,44 @@ impl Segment {
pub fn foreach<F>(&self, db: &db::Database, mut f: F) -> Result<(), Error>
where F: FnMut(&SampleIndexIterator) -> Result<(), Error>
{
let extra = db.lock().get_recording(self.id)?;
let data = &(&extra).video_index;
trace!("foreach on recording {}/{}: {} frames, actual_time_90k: {:?}",
self.camera_id, self.recording_id, self.frames, self.actual_time_90k());
let playback = db.lock().get_recording_playback(self.camera_id, self.recording_id)?;
let data = &(&playback).video_index;
let mut it = self.begin;
if it.i == 0 {
if !it.next(data)? {
return Err(Error::new(format!("recording {}: no frames", self.id)));
return Err(Error::new(format!("recording {}/{}: no frames",
self.camera_id, self.recording_id)));
}
if !it.is_key {
return Err(Error::new(format!("recording {}: doesn't start with key frame",
self.id)));
return Err(Error::new(format!("recording {}/{}: doesn't start with key frame",
self.camera_id, self.recording_id)));
}
}
let mut have_frame = true;
let mut key_frame = 0;
for i in 0 .. self.frames {
if !have_frame {
return Err(Error::new(format!("recording {}: expected {} frames, found only {}",
self.id, self.frames, i+1)));
return Err(Error::new(format!("recording {}/{}: expected {} frames, found only {}",
self.camera_id, self.recording_id, self.frames,
i+1)));
}
if it.is_key {
key_frame += 1;
if key_frame > self.key_frames {
return Err(Error::new(format!("recording {}: more than expected {} key frames",
self.id, self.key_frames)));
return Err(Error::new(format!(
"recording {}/{}: more than expected {} key frames",
self.camera_id, self.recording_id, self.key_frames)));
}
}
f(&it)?;
have_frame = it.next(data)?;
}
if key_frame < self.key_frames {
return Err(Error::new(format!("recording {}: expected {} key frames, found only {}",
self.id, self.key_frames, key_frame)));
return Err(Error::new(format!("recording {}/{}: expected {} key frames, found only {}",
self.camera_id, self.recording_id, self.key_frames,
key_frame)));
}
Ok(())
}

View File

@ -31,8 +31,6 @@
-- schema.sql: SQLite3 database schema for Moonfire NVR.
-- See also design/schema.md.
--pragma journal_mode = wal;
-- This table tracks the schema version.
-- There is one row for the initial database creation (inserted below, after the
-- create statements) and one for each upgrade procedure (if any).
@ -49,10 +47,10 @@ create table version (
create table camera (
id integer primary key,
uuid blob unique,-- not null check (length(uuid) = 16),
uuid blob unique not null check (length(uuid) = 16),
-- A short name of the camera, used in log messages.
short_name text,-- not null,
short_name text not null,
-- A short description of the camera.
description text,
@ -77,14 +75,42 @@ create table camera (
-- The number of bytes of video to retain, excluding the currently-recording
-- file. Older files will be deleted as necessary to stay within this limit.
retain_bytes integer not null check (retain_bytes >= 0)
retain_bytes integer not null check (retain_bytes >= 0),
-- The low 32 bits of the next recording id to assign for this camera.
-- Typically this is the maximum current recording + 1, but it does
-- not decrease if that recording is deleted.
next_recording_id integer not null check (next_recording_id >= 0)
);
-- Each row represents a single completed recorded segment of video.
-- Recordings are typically ~60 seconds; never more than 5 minutes.
create table recording (
id integer primary key,
camera_id integer references camera (id) not null,
-- The high 32 bits of composite_id are taken from the camera's id, which
-- improves locality. The low 32 bits are taken from the camera's
-- next_recording_id (which should be post-incremented in the same
-- transaction). It'd be simpler to use a "without rowid" table and separate
-- fields to make up the primary key, but
-- <https://www.sqlite.org/withoutrowid.html> points out that "without rowid"
-- is not appropriate when the average row size is in excess of 50 bytes.
-- These rows are typically 1--5 KiB.
composite_id integer primary key,
-- This field is redundant with id above, but used to enforce the reference
-- constraint and to structure the recording_start_time index.
camera_id integer not null references camera (id),
-- The offset of this recording within a run. 0 means this was the first
-- recording made from a RTSP session. The start of the run has id
-- (id-run_offset).
run_offset integer not null,
-- flags is a bitmask:
--
-- * 1, or "trailing zero", indicates that this recording is the last in a
-- stream. As the duration of a sample is not known until the next sample
-- is received, the final sample in this recording will have duration 0.
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
@ -100,18 +126,16 @@ create table recording (
-- The number of 90 kHz units the local system time is ahead of the
-- recording; negative numbers indicate the local system time is behind
-- the recording. Large values would indicate that the local time has jumped
-- during recording or that the local time and camera time frequencies do
-- not match.
-- the recording. Large absolute values would indicate that the local time
-- has jumped during recording or that the local time and camera time
-- frequencies do not match.
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
check (composite_id >> 32 = camera_id)
);
create index recording_cover on recording (
@ -124,11 +148,17 @@ create index recording_cover on recording (
-- to consult the underlying row.
duration_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_bytes
);
create table recording_playback (
composite_id integer primary key references recording (composite_id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
);
-- Files in the sample file directory which may be present but should simply be
-- discarded on startup. (Recordings which were never completed or have been
-- marked for completion.)
@ -156,4 +186,4 @@ create table video_sample_entry (
);
insert into version (id, unix_time, notes)
values (0, cast(strftime('%s', 'now') as int), 'db creation');
values (1, cast(strftime('%s', 'now') as int), 'db creation');

View File

@ -28,7 +28,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use clock::Clock;
use clock::Clocks;
use db::{Camera, Database};
use dir;
use error::Error;
@ -43,15 +43,15 @@ use time;
pub static ROTATE_INTERVAL_SEC: i64 = 60;
/// Common state that can be used by multiple `Streamer` instances.
pub struct Environment<'a, 'b, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
pub clock: &'a C,
pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
pub clocks: &'a C,
pub opener: &'a stream::Opener<S>,
pub db: &'b Arc<Database>,
pub dir: &'b Arc<dir::SampleFileDir>,
pub shutdown: &'b Arc<AtomicBool>,
}
pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
shutdown: Arc<AtomicBool>,
// State below is only used by the thread in Run.
@ -60,7 +60,7 @@ pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
db: Arc<Database>,
dir: Arc<dir::SampleFileDir>,
syncer_channel: dir::SyncerChannel,
clock: &'a C,
clocks: &'a C,
opener: &'a stream::Opener<S>,
camera_id: i32,
short_name: String,
@ -68,7 +68,14 @@ pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
redacted_url: String,
}
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
struct WriterState<'a> {
writer: dir::Writer<'a>,
/// Seconds since epoch at which to next rotate.
rotate: i64,
}
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel,
camera_id: i32, c: &Camera, rotate_offset_sec: i64,
rotate_interval_sec: i64) -> Self {
@ -79,7 +86,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
db: env.db.clone(),
dir: env.dir.clone(),
syncer_channel: syncer_channel,
clock: env.clock,
clocks: env.clocks,
opener: env.opener,
camera_id: camera_id,
short_name: c.short_name.to_owned(),
@ -95,7 +102,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
if let Err(e) = self.run_once() {
let sleep_time = time::Duration::seconds(1);
warn!("{}: sleeping for {:?} after error: {}", self.short_name, sleep_time, e);
self.clock.sleep(sleep_time);
self.clocks.sleep(sleep_time);
}
}
info!("{}: shutting down", self.short_name);
@ -105,6 +112,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
info!("{}: Opening input: {}", self.short_name, self.redacted_url);
let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?;
let realtime_offset = self.clocks.realtime() - self.clocks.monotonic();
// TODO: verify time base.
// TODO: verify width/height.
let extra_data = stream.get_extra_data()?;
@ -113,10 +121,9 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
&extra_data.sample_entry)?;
debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id);
let mut seen_key_frame = false;
let mut rotate = None;
let mut writer: Option<dir::Writer> = None;
let mut state: Option<WriterState> = None;
let mut transformed = Vec::new();
let mut next_start = None;
let mut prev = None;
while !self.shutdown.load(Ordering::SeqCst) {
let pkt = stream.get_next()?;
let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?;
@ -126,27 +133,36 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
debug!("{}: have first key frame", self.short_name);
seen_key_frame = true;
}
let frame_realtime = self.clock.get_time();
if let Some(r) = rotate {
if frame_realtime.sec > r && pkt.is_key() {
let w = writer.take().expect("rotate set implies writer is set");
let frame_realtime = self.clocks.monotonic() + realtime_offset;
let local_time = recording::Time::new(frame_realtime);
state = if let Some(s) = state {
if frame_realtime.sec > s.rotate && pkt.is_key() {
trace!("{}: write on normal rotation", self.short_name);
next_start = Some(w.close(Some(pts))?);
prev = Some(s.writer.close(Some(pts))?);
None
} else {
Some(s)
}
};
let mut w = match writer {
Some(w) => w,
} else { None };
let mut s = match state {
Some(s) => s,
None => {
let r = frame_realtime.sec -
(frame_realtime.sec % self.rotate_interval_sec) +
self.rotate_offset_sec;
rotate = Some(
if r <= frame_realtime.sec { r + self.rotate_interval_sec } else { r });
let local_realtime = recording::Time::new(frame_realtime);
let sec = frame_realtime.sec;
let r = sec - (sec % self.rotate_interval_sec) + self.rotate_offset_sec;
let r = r + if r <= sec { self.rotate_interval_sec } else { 0 };
self.dir.create_writer(&self.syncer_channel,
next_start.unwrap_or(local_realtime), local_realtime,
self.camera_id, video_sample_entry_id)?
// On the first recording, set rotate time to not the next rotate offset, but
// the one after, so that it's longer than usual rather than shorter than
// usual. This ensures there's plenty of frame times to use when calculating
// the start time.
let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 };
let w = self.dir.create_writer(&self.syncer_channel, prev, self.camera_id,
video_sample_entry_id)?;
WriterState{
writer: w,
rotate: r,
}
},
};
let orig_data = match pkt.data() {
@ -159,11 +175,11 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
} else {
orig_data
};
w.write(transformed_data, pts, pkt.is_key())?;
writer = Some(w);
s.writer.write(transformed_data, local_time, pts, pkt.is_key())?;
state = Some(s);
}
if let Some(w) = writer {
w.close(None)?;
if let Some(s) = state {
s.writer.close(None)?;
}
Ok(())
}
@ -171,13 +187,14 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
#[cfg(test)]
mod tests {
use clock::{self, Clock};
use clock::{self, Clocks};
use db;
use error::Error;
use ffmpeg;
use ffmpeg::packet::Mut;
use h264;
use recording;
use std::cmp;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::atomic::{AtomicBool, Ordering};
use stream::{self, Opener, Stream};
@ -185,20 +202,24 @@ mod tests {
use time;
struct ProxyingStream<'a> {
clock: &'a clock::SimulatedClock,
clocks: &'a clock::SimulatedClocks,
inner: stream::FfmpegStream,
last_duration: time::Duration,
buffered: time::Duration,
slept: time::Duration,
ts_offset: i64,
ts_offset_pkts_left: u32,
pkts_left: u32,
}
impl<'a> ProxyingStream<'a> {
fn new(clock: &'a clock::SimulatedClock, inner: stream::FfmpegStream) -> ProxyingStream {
fn new(clocks: &'a clock::SimulatedClocks, buffered: time::Duration,
inner: stream::FfmpegStream) -> ProxyingStream {
clocks.sleep(buffered);
ProxyingStream {
clock: clock,
clocks: clocks,
inner: inner,
last_duration: time::Duration::seconds(0),
buffered: buffered,
slept: time::Duration::seconds(0),
ts_offset: 0,
ts_offset_pkts_left: 0,
pkts_left: 0,
@ -213,13 +234,21 @@ mod tests {
}
self.pkts_left -= 1;
// Advance clock to when this packet starts.
self.clock.sleep(self.last_duration);
let mut pkt = self.inner.get_next()?;
self.last_duration = time::Duration::nanoseconds(
pkt.duration() * 1_000_000_000 / recording::TIME_UNITS_PER_SEC);
// Advance clock to the end of this frame.
// Avoid accumulating conversion error by tracking the total amount to sleep and how
// much we've already slept, rather than considering each frame in isolation.
{
let goal = pkt.pts().unwrap() + pkt.duration();
let goal = time::Duration::nanoseconds(
goal * 1_000_000_000 / recording::TIME_UNITS_PER_SEC);
let duration = goal - self.slept;
let buf_part = cmp::min(self.buffered, duration);
self.buffered = self.buffered - buf_part;
self.clocks.sleep(duration - buf_part);
self.slept = goal;
}
if self.ts_offset_pkts_left > 0 {
self.ts_offset_pkts_left -= 1;
@ -276,8 +305,9 @@ mod tests {
is_key: bool,
}
fn get_frames(db: &MutexGuard<db::LockedDatabase>, recording_id: i64) -> Vec<Frame> {
let rec = db.get_recording(recording_id).unwrap();
fn get_frames(db: &MutexGuard<db::LockedDatabase>, camera_id: i32, recording_id: i32)
-> Vec<Frame> {
let rec = db.get_recording_playback(camera_id, recording_id).unwrap();
let mut it = recording::SampleIndexIterator::new();
let mut frames = Vec::new();
while it.next(&rec.video_index).unwrap() {
@ -293,10 +323,12 @@ mod tests {
#[test]
fn basic() {
testutil::init();
let clock = clock::SimulatedClock::new();
clock.sleep(time::Duration::seconds(1430006400)); // 2015-04-26 00:00:00 UTC
// 2015-04-25 00:00:00 UTC
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap();
let mut stream = ProxyingStream::new(&clock, stream);
let mut stream = ProxyingStream::new(&clocks, time::Duration::seconds(2), stream);
stream.ts_offset = 180000; // starting pts of the input should be irrelevant
stream.ts_offset_pkts_left = u32::max_value();
stream.pkts_left = u32::max_value();
@ -307,7 +339,7 @@ mod tests {
};
let db = testutil::TestDb::new();
let env = super::Environment{
clock: &clock,
clocks: &clocks,
opener: &opener,
db: &db.db,
dir: &db.dir,
@ -318,7 +350,7 @@ mod tests {
let l = db.db.lock();
let camera = l.cameras_by_id().get(&testutil::TEST_CAMERA_ID).unwrap();
stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_CAMERA_ID,
camera, 0, 5);
camera, 0, 3);
}
stream.run();
assert!(opener.streams.lock().unwrap().is_empty());
@ -326,21 +358,34 @@ mod tests {
let db = db.db.lock();
// Compare frame-by-frame. Note below that while the rotation is scheduled to happen near
// 5-second boundaries (such as 2016-04-26 00:00:05), it gets deferred until the next key
// frame, which in this case is 00:00:07.
assert_eq!(get_frames(&db, 1), &[
// 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later:
// * the first rotation is always skipped
// * the second rotation is deferred until a key frame.
assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 1), &[
Frame{start_90k: 0, duration_90k: 90379, is_key: true},
Frame{start_90k: 90379, duration_90k: 89884, is_key: false},
Frame{start_90k: 180263, duration_90k: 89749, is_key: false},
Frame{start_90k: 270012, duration_90k: 89981, is_key: false},
Frame{start_90k: 270012, duration_90k: 89981, is_key: false}, // pts_time 3.0001...
Frame{start_90k: 359993, duration_90k: 90055, is_key: true},
Frame{start_90k: 450048, duration_90k: 89967, is_key: false}, // pts_time 5.0005333...
Frame{start_90k: 540015, duration_90k: 90021, is_key: false},
Frame{start_90k: 450048, duration_90k: 89967, is_key: false},
Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001...
Frame{start_90k: 630036, duration_90k: 89958, is_key: false},
]);
assert_eq!(get_frames(&db, 2), &[
assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 2), &[
Frame{start_90k: 0, duration_90k: 90011, is_key: true},
Frame{start_90k: 90011, duration_90k: 0, is_key: false},
]);
let mut recordings = Vec::new();
db.list_recordings_by_id(testutil::TEST_CAMERA_ID, 1..3, |r| {
recordings.push(r);
Ok(())
}).unwrap();
assert_eq!(2, recordings.len());
assert_eq!(1, recordings[0].id);
assert_eq!(recording::Time(128700575999999), recordings[0].start);
assert_eq!(0, recordings[0].flags);
assert_eq!(2, recordings[1].id);
assert_eq!(recording::Time(128700576719993), recordings[1].start);
assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags);
}
}

View File

@ -88,9 +88,9 @@ impl TestDb {
let uuid_bytes = &TEST_CAMERA_UUID.as_bytes()[..];
conn.execute_named(r#"
insert into camera (uuid, short_name, description, host, username, password,
main_rtsp_path, sub_rtsp_path, retain_bytes)
main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id)
values (:uuid, :short_name, :description, :host, :username, :password,
:main_rtsp_path, :sub_rtsp_path, :retain_bytes)
:main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id)
"#, &[
(":uuid", &uuid_bytes),
(":short_name", &"test camera"),
@ -101,6 +101,7 @@ impl TestDb {
(":main_rtsp_path", &"/main"),
(":sub_rtsp_path", &"/sub"),
(":retain_bytes", &1048576i64),
(":next_recording_id", &1i64),
]).unwrap();
assert_eq!(TEST_CAMERA_ID as i64, conn.last_insert_rowid());
let db = sync::Arc::new(db::Database::new(conn).unwrap());
@ -117,7 +118,7 @@ impl TestDb {
}
pub fn create_recording_from_encoder(&self, encoder: recording::SampleIndexEncoder)
-> db::ListCameraRecordingsRow {
-> db::ListRecordingsRow {
let mut db = self.db.lock();
let video_sample_entry_id =
db.insert_video_sample_entry(1920, 1080, &[0u8; 100]).unwrap();
@ -130,19 +131,22 @@ impl TestDb {
sample_file_bytes: encoder.sample_file_bytes,
time: START_TIME ..
START_TIME + recording::Duration(encoder.total_duration_90k as i64),
local_time: START_TIME,
local_time_delta: recording::Duration(0),
video_samples: encoder.video_samples,
video_sync_samples: encoder.video_sync_samples,
video_sample_entry_id: video_sample_entry_id,
sample_file_uuid: Uuid::nil(),
video_index: encoder.video_index,
sample_file_sha1: [0u8; 20],
run_offset: 0, // TODO
flags: 0, // TODO
}).unwrap();
tx.commit().unwrap();
}
let mut row = None;
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
db.list_recordings(TEST_CAMERA_ID, &all_time, |r| { row = Some(r); Ok(()) }).unwrap();
db.list_recordings_by_time(TEST_CAMERA_ID, all_time,
|r| { row = Some(r); Ok(()) }).unwrap();
row.unwrap()
}
}

94
src/upgrade/mod.rs Normal file
View File

@ -0,0 +1,94 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
/// Upgrades the database schema.
///
/// See `guide/schema.md` for more information.
use db;
use error::Error;
use rusqlite;
mod v0_to_v1;
const UPGRADE_NOTES: &'static str =
concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION"));
const UPGRADERS: [fn(&rusqlite::Transaction) -> Result<(), Error>; 1] = [
v0_to_v1::run,
];
fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(), Error> {
assert!(!requested.contains(';')); // quick check for accidental sql injection.
let actual = conn.query_row(&format!("pragma journal_mode = {}", requested), &[],
|row| row.get_checked::<_, String>(0))??;
info!("...database now in journal_mode {} (requested {}).", actual, requested);
Ok(())
}
pub fn run(mut conn: rusqlite::Connection, preset_journal: &str,
no_vacuum: bool) -> Result<(), Error> {
{
assert_eq!(UPGRADERS.len(), db::EXPECTED_VERSION as usize);
let old_ver =
conn.query_row("select max(id) from version", &[], |row| row.get_checked(0))??;
if old_ver > db::EXPECTED_VERSION {
return Err(Error::new(format!("Database is at version {}, later than expected {}",
old_ver, db::EXPECTED_VERSION)))?;
} else if old_ver < 0 {
return Err(Error::new(format!("Database is at negative version {}!", old_ver)));
}
info!("Upgrading database from version {} to version {}...", old_ver, db::EXPECTED_VERSION);
set_journal_mode(&conn, preset_journal).unwrap();
for ver in old_ver .. db::EXPECTED_VERSION {
info!("...from version {} to version {}", ver, ver + 1);
let tx = conn.transaction()?;
UPGRADERS[ver as usize](&tx)?;
tx.execute(r#"
insert into version (id, unix_time, notes)
values (?, cast(strftime('%s', 'now') as int32), ?)
"#, &[&(ver + 1), &UPGRADE_NOTES])?;
tx.commit()?;
}
}
// WAL is the preferred journal mode for normal operation; it reduces the number of syncs
// without compromising safety.
set_journal_mode(&conn, "wal").unwrap();
if !no_vacuum {
info!("...vacuuming database after upgrade.");
conn.execute_batch(r#"
pragma page_size = 16384;
vacuum;
"#).unwrap();
}
info!("...done.");
Ok(())
}

235
src/upgrade/v0_to_v1.rs Normal file
View File

@ -0,0 +1,235 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
/// Upgrades a version 0 schema to a version 1 schema.
use db;
use error::Error;
use recording;
use rusqlite;
use std::collections::HashMap;
pub fn run(tx: &rusqlite::Transaction) -> Result<(), Error> {
// These create statements match the schema.sql when version 1 was the latest.
tx.execute_batch(r#"
alter table camera rename to old_camera;
create table camera (
id integer primary key,
uuid blob unique,
short_name text not null,
description text,
host text,
username text,
password text,
main_rtsp_path text,
sub_rtsp_path text,
retain_bytes integer not null check (retain_bytes >= 0),
next_recording_id integer not null check (next_recording_id >= 0)
);
alter table recording rename to old_recording;
drop index recording_cover;
create table recording (
composite_id integer primary key,
camera_id integer not null references camera (id),
run_offset integer not null,
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
start_time_90k integer not null check (start_time_90k > 0),
duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000),
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
check (composite_id >> 32 = camera_id)
);
create index recording_cover on recording (
start_time_90k,
duration_90k,
video_samples,
video_sample_entry_id,
sample_file_bytes
);
create table recording_playback (
composite_id integer primary key references recording (composite_id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
);
"#)?;
let camera_state = fill_recording(tx).unwrap();
fill_camera(tx, camera_state).unwrap();
tx.execute_batch(r#"
drop table old_camera;
drop table old_recording;
"#)?;
Ok(())
}
struct CameraState {
/// tuple of (run_start_id, next_start_90k).
current_run: Option<(i64, i64)>,
/// As in the `next_recording_id` field of the `camera` table.
next_recording_id: i32,
}
/// Fills the `recording` and `recording_playback` tables from `old_recording`, returning
/// the `camera_state` map for use by a following call to `fill_cameras`.
fn fill_recording(tx: &rusqlite::Transaction) -> Result<HashMap<i32, CameraState>, Error> {
let mut select = tx.prepare(r#"
select
camera_id,
sample_file_bytes,
start_time_90k,
duration_90k,
local_time_delta_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_uuid,
sample_file_sha1,
video_index
from
old_recording
"#)?;
let mut insert1 = tx.prepare(r#"
insert into recording values (:composite_id, :camera_id, :run_offset, :flags,
:sample_file_bytes, :start_time_90k, :duration_90k,
:local_time_delta_90k, :video_samples, :video_sync_samples,
:video_sample_entry_id)
"#)?;
let mut insert2 = tx.prepare(r#"
insert into recording_playback values (:composite_id, :sample_file_uuid, :sample_file_sha1,
:video_index)
"#)?;
let mut rows = select.query(&[])?;
let mut camera_state: HashMap<i32, CameraState> = HashMap::new();
while let Some(row) = rows.next() {
let row = row?;
let camera_id: i32 = row.get_checked(0)?;
let camera_state = camera_state.entry(camera_id).or_insert_with(|| {
CameraState{
current_run: None,
next_recording_id: 1,
}
});
let composite_id = ((camera_id as i64) << 32) | (camera_state.next_recording_id as i64);
camera_state.next_recording_id += 1;
let sample_file_bytes: i32 = row.get_checked(1)?;
let start_time_90k: i64 = row.get_checked(2)?;
let duration_90k: i32 = row.get_checked(3)?;
let local_time_delta_90k: i64 = row.get_checked(4)?;
let video_samples: i32 = row.get_checked(5)?;
let video_sync_samples: i32 = row.get_checked(6)?;
let video_sample_entry_id: i32 = row.get_checked(7)?;
let sample_file_uuid: Vec<u8> = row.get_checked(8)?;
let sample_file_sha1: Vec<u8> = row.get_checked(9)?;
let video_index: Vec<u8> = row.get_checked(10)?;
let trailing_zero = {
let mut it = recording::SampleIndexIterator::new();
while it.next(&video_index)? {}
it.duration_90k == 0
};
let run_id = match camera_state.current_run {
Some((run_id, expected_start)) if expected_start == start_time_90k => run_id,
_ => composite_id,
};
insert1.execute_named(&[
(":composite_id", &composite_id),
(":camera_id", &camera_id),
(":run_offset", &(composite_id - run_id)),
(":flags", &(if trailing_zero { db::RecordingFlags::TrailingZero as i32 } else { 0 })),
(":sample_file_bytes", &sample_file_bytes),
(":start_time_90k", &start_time_90k),
(":duration_90k", &duration_90k),
(":local_time_delta_90k", &local_time_delta_90k),
(":video_samples", &video_samples),
(":video_sync_samples", &video_sync_samples),
(":video_sample_entry_id", &video_sample_entry_id),
])?;
insert2.execute_named(&[
(":composite_id", &composite_id),
(":sample_file_uuid", &sample_file_uuid),
(":sample_file_sha1", &sample_file_sha1),
(":video_index", &video_index),
])?;
camera_state.current_run = if trailing_zero {
None
} else {
Some((run_id, start_time_90k + duration_90k as i64))
};
}
Ok(camera_state)
}
fn fill_camera(tx: &rusqlite::Transaction, camera_state: HashMap<i32, CameraState>)
-> Result<(), Error> {
let mut select = tx.prepare(r#"
select
id, uuid, short_name, description, host, username, password, main_rtsp_path,
sub_rtsp_path, retain_bytes
from
old_camera
"#)?;
let mut insert = tx.prepare(r#"
insert into camera values (:id, :uuid, :short_name, :description, :host, :username, :password,
:main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id)
"#)?;
let mut rows = select.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let id: i32 = row.get_checked(0)?;
let uuid: Vec<u8> = row.get_checked(1)?;
let short_name: String = row.get_checked(2)?;
let description: String = row.get_checked(3)?;
let host: String = row.get_checked(4)?;
let username: String = row.get_checked(5)?;
let password: String = row.get_checked(6)?;
let main_rtsp_path: String = row.get_checked(7)?;
let sub_rtsp_path: String = row.get_checked(8)?;
let retain_bytes: i64 = row.get_checked(9)?;
insert.execute_named(&[
(":id", &id),
(":uuid", &uuid),
(":short_name", &short_name),
(":description", &description),
(":host", &host),
(":username", &username),
(":password", &password),
(":main_rtsp_path", &main_rtsp_path),
(":sub_rtsp_path", &sub_rtsp_path),
(":retain_bytes", &retain_bytes),
(":next_recording_id",
&camera_state.get(&id).map(|s| s.next_recording_id).unwrap_or(1)),
])?;
}
Ok(())
}

View File

@ -34,14 +34,16 @@ use core::borrow::Borrow;
use core::str::FromStr;
use db;
use dir::SampleFileDir;
use error::{Error, Result};
use error::Error;
use http_entity;
use hyper::{header,server,status};
use hyper::uri::RequestUri;
use mime;
use mp4;
use recording;
use regex::Regex;
use serde_json;
use std::cmp;
use std::fmt;
use std::io::Write;
use std::ops::Range;
@ -57,6 +59,11 @@ const DECIMAL_PREFIXES: &'static [&'static str] =&[" ", " k", " M", " G", " T",
lazy_static! {
static ref JSON: mime::Mime = mime!(Application/Json);
static ref HTML: mime::Mime = mime!(Text/Html);
/// Regex used to parse the `s` query parameter to `view.mp4`.
/// As described in `design/api.md`, this is of the form
/// `START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`.
static ref SEGMENTS_RE: Regex = Regex::new(r"^(\d+)(-\d+)?(?:\.(\d+)?-(\d+)?)?$").unwrap();
}
mod json { include!(concat!(env!("OUT_DIR"), "/serde_types.rs")); }
@ -181,18 +188,58 @@ pub struct Handler {
dir: Arc<SampleFileDir>,
}
#[derive(Debug, Eq, PartialEq)]
struct Segments {
ids: Range<i32>,
start_time: i64,
end_time: Option<i64>,
}
impl Segments {
pub fn parse(input: &str) -> Result<Segments, ()> {
let caps = SEGMENTS_RE.captures(input).ok_or(())?;
let ids_start = i32::from_str(caps.at(1).unwrap()).map_err(|_| ())?;
let ids_end = match caps.at(2) {
Some(e) => i32::from_str(&e[1..]).map_err(|_| ())?,
None => ids_start,
} + 1;
if ids_start < 0 || ids_end <= ids_start {
return Err(());
}
let start_time = caps.at(3).map_or(Ok(0), i64::from_str).map_err(|_| ())?;
if start_time < 0 {
return Err(());
}
let end_time = match caps.at(4) {
Some(v) => {
let e = i64::from_str(v).map_err(|_| ())?;
if e <= start_time {
return Err(());
}
Some(e)
},
None => None
};
Ok(Segments{
ids: ids_start .. ids_end,
start_time: start_time,
end_time: end_time,
})
}
}
impl Handler {
pub fn new(db: Arc<db::Database>, dir: Arc<SampleFileDir>) -> Self {
Handler{db: db, dir: dir}
}
fn not_found(&self, mut res: server::Response) -> Result<()> {
fn not_found(&self, mut res: server::Response) -> Result<(), Error> {
*res.status_mut() = status::StatusCode::NotFound;
res.send(b"not found")?;
Ok(())
}
fn list_cameras(&self, req: &server::Request, mut res: server::Response) -> Result<()> {
fn list_cameras(&self, req: &server::Request, mut res: server::Response) -> Result<(), Error> {
let json = is_json(req);
let buf = {
let db = self.db.lock();
@ -207,7 +254,7 @@ impl Handler {
Ok(())
}
fn list_cameras_html(&self, db: MutexGuard<db::LockedDatabase>) -> Result<Vec<u8>> {
fn list_cameras_html(&self, db: MutexGuard<db::LockedDatabase>) -> Result<Vec<u8>, Error> {
let mut buf = Vec::new();
buf.extend_from_slice(b"\
<!DOCTYPE html>\n\
@ -242,7 +289,7 @@ impl Handler {
}
fn camera(&self, uuid: Uuid, query: &str, req: &server::Request, mut res: server::Response)
-> Result<()> {
-> Result<(), Error> {
let json = is_json(req);
let buf = {
let db = self.db.lock();
@ -260,8 +307,22 @@ impl Handler {
}
fn camera_html(&self, db: MutexGuard<db::LockedDatabase>, query: &str,
uuid: Uuid) -> Result<Vec<u8>> {
let r = Handler::get_optional_range(query)?;
uuid: Uuid) -> Result<Vec<u8>, Error> {
let (r, trim) = {
let mut start = i64::min_value();
let mut end = i64::max_value();
let mut trim = false;
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
let (key, value) = (key.borrow(), value.borrow());
match key {
"start_time_90k" => start = i64::from_str(value)?,
"end_time_90k" => end = i64::from_str(value)?,
"trim" if value == "true" => trim = true,
_ => {},
}
};
(recording::Time(start) .. recording::Time(end), trim)
};
let camera = db.get_camera(uuid)
.ok_or_else(|| Error::new("no such camera".to_owned()))?;
let mut buf = Vec::new();
@ -290,26 +351,54 @@ impl Handler {
// parameters between recordings.
static FORCE_SPLIT_DURATION: recording::Duration =
recording::Duration(60 * 60 * recording::TIME_UNITS_PER_SEC);
db.list_aggregated_recordings(camera.id, &r, FORCE_SPLIT_DURATION, |row| {
let seconds = (row.range.end.0 - row.range.start.0) / recording::TIME_UNITS_PER_SEC;
let mut rows = Vec::new();
db.list_aggregated_recordings(camera.id, r.clone(), FORCE_SPLIT_DURATION, |row| {
rows.push(row.clone());
Ok(())
})?;
// Display newest recording first.
rows.sort_by(|r1, r2| r2.ids.start.cmp(&r1.ids.start));
for row in &rows {
let seconds = (row.time.end.0 - row.time.start.0) / recording::TIME_UNITS_PER_SEC;
let url = {
let mut url = String::with_capacity(64);
use std::fmt::Write;
write!(&mut url, "view.mp4?s={}", row.ids.start)?;
if row.ids.end != row.ids.start + 1 {
write!(&mut url, "-{}", row.ids.end - 1)?;
}
if trim {
let rel_start = if row.time.start < r.start { Some(r.start - row.time.start) }
else { None };
let rel_end = if row.time.end > r.end { Some(r.end - row.time.start) }
else { None };
if rel_start.is_some() || rel_end.is_some() {
url.push('.');
if let Some(s) = rel_start { write!(&mut url, "{}", s.0)?; }
url.push('-');
if let Some(e) = rel_end { write!(&mut url, "{}", e.0)?; }
}
}
url
};
write!(&mut buf, "\
<tr><td><a href=\"view.mp4?start_time_90k={}&end_time_90k={}\">{}</a></td>\
<tr><td><a href=\"{}\">{}</a></td>\
<td>{}</td><td>{}x{}</td><td>{:.0}</td><td>{:b}B</td><td>{}bps</td></tr>\n",
row.range.start.0, row.range.end.0,
HumanizedTimestamp(Some(row.range.start)),
HumanizedTimestamp(Some(row.range.end)), row.video_sample_entry.width,
url, HumanizedTimestamp(Some(row.time.start)),
HumanizedTimestamp(Some(row.time.end)), row.video_sample_entry.width,
row.video_sample_entry.height,
if seconds == 0 { 0. } else { row.video_samples as f32 / seconds as f32 },
Humanized(row.sample_file_bytes),
Humanized(if seconds == 0 { 0 } else { row.sample_file_bytes * 8 / seconds }))?;
Ok(())
})?;
};
buf.extend_from_slice(b"</table>\n</html>\n");
Ok(buf)
}
fn camera_recordings(&self, uuid: Uuid, query: &str, req: &server::Request,
mut res: server::Response) -> Result<()> {
mut res: server::Response) -> Result<(), Error> {
let r = Handler::get_optional_range(query)?;
if !is_json(req) {
*res.status_mut() = status::StatusCode::NotAcceptable;
@ -321,11 +410,11 @@ impl Handler {
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| Error::new("no such camera".to_owned()))?;
db.list_aggregated_recordings(camera.id, &r, recording::Duration(i64::max_value()),
db.list_aggregated_recordings(camera.id, r, recording::Duration(i64::max_value()),
|row| {
out.recordings.push(json::Recording{
start_time_90k: row.range.start.0,
end_time_90k: row.range.end.0,
start_time_90k: row.time.start.0,
end_time_90k: row.time.end.0,
sample_file_bytes: row.sample_file_bytes,
video_samples: row.video_samples,
video_sample_entry_width: row.video_sample_entry.width,
@ -342,79 +431,90 @@ impl Handler {
}
fn camera_view_mp4(&self, uuid: Uuid, query: &str, req: &server::Request,
res: server::Response) -> Result<()> {
res: server::Response) -> Result<(), Error> {
let camera_id = {
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| Error::new("no such camera".to_owned()))?;
camera.id
};
let mut start = None;
let mut end = None;
let mut include_ts = false;
let mut builder = mp4::Mp4FileBuilder::new();
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
let (key, value) = (key.borrow(), value.borrow());
match key {
"start_time_90k" => start = Some(recording::Time(i64::from_str(value)?)),
"end_time_90k" => end = Some(recording::Time(i64::from_str(value)?)),
"ts" => { include_ts = value == "true"; },
_ => {},
"s" => {
let s = Segments::parse(value).map_err(
|_| Error::new(format!("invalid s parameter: {}", value)))?;
debug!("camera_view_mp4: appending s={:?}", s);
let mut est_segments = (s.ids.end - s.ids.start) as usize;
if let Some(end) = s.end_time {
// There should be roughly ceil((end - start) / desired_recording_duration)
// recordings in the desired timespan if there are no gaps or overlap,
// possibly another for misalignment of the requested timespan with the
// rotate offset and another because rotation only happens at key frames.
let ceil_durations = (end - s.start_time +
recording::DESIRED_RECORDING_DURATION - 1) /
recording::DESIRED_RECORDING_DURATION;
est_segments = cmp::min(est_segments, (ceil_durations + 2) as usize);
}
builder.reserve(est_segments);
let db = self.db.lock();
let mut prev = None;
let mut cur_off = 0;
db.list_recordings_by_id(camera_id, s.ids.clone(), |r| {
// Check for missing recordings.
match prev {
None if r.id == s.ids.start => {},
None => return Err(Error::new(format!("no such recording {}/{}",
camera_id, s.ids.start))),
Some(id) if r.id != id + 1 => {
return Err(Error::new(format!("no such recording {}/{}",
camera_id, id + 1)));
},
_ => {},
};
prev = Some(r.id);
// Add a segment for the relevant part of the recording, if any.
let end_time = s.end_time.unwrap_or(i64::max_value());
let d = r.duration_90k as i64;
if s.start_time <= cur_off + d && cur_off < end_time {
let start = cmp::max(0, s.start_time - cur_off);
let end = cmp::min(d, end_time - cur_off);
let times = start as i32 .. end as i32;
debug!("...appending recording {}/{} with times {:?} (out of dur {})",
r.camera_id, r.id, times, d);
builder.append(&db, r, start as i32 .. end as i32)?;
} else {
debug!("...skipping recording {}/{} dur {}", r.camera_id, r.id, d);
}
cur_off += d;
Ok(())
})?;
// Check for missing recordings.
match prev {
Some(id) if s.ids.end != id + 1 => {
return Err(Error::new(format!("no such recording {}/{}",
camera_id, s.ids.end - 1)));
},
None => {
return Err(Error::new(format!("no such recording {}/{}",
camera_id, s.ids.start)));
},
_ => {},
};
if let Some(end) = s.end_time {
if end > cur_off {
return Err(Error::new(
format!("end time {} is beyond specified recordings", end)));
}
}
},
"ts" => builder.include_timestamp_subtitle_track(value == "true"),
_ => return Err(Error::new(format!("parameter {} not understood", key))),
}
};
let start = start.ok_or_else(|| Error::new("start_time_90k missing".to_owned()))?;
let end = end.ok_or_else(|| Error::new("end_time_90k missing".to_owned()))?;
let desired_range = start .. end;
let mut builder = mp4::Mp4FileBuilder::new();
// There should be roughly ceil((end - start) / desired_recording_duration) recordings
// in the desired timespan if there are no gaps or overlap. Add a couple more to be safe:
// one for misalignment of the requested timespan with the rotate offset, another because
// rotation only happens at key frames.
let ceil_durations = ((end - start).0 + recording::DESIRED_RECORDING_DURATION - 1) /
recording::DESIRED_RECORDING_DURATION;
let est_records = (ceil_durations + 2) as usize;
let mut next_start = start;
builder.reserve(est_records);
{
let db = self.db.lock();
db.list_recordings(camera_id, &desired_range, |r| {
if builder.len() == 0 && r.start > next_start {
return Err(Error::new(format!("recording started late ({} vs requested {})",
r.start, start)));
} else if builder.len() != 0 && r.start != next_start {
return Err(Error::new(format!("gap/overlap in recording: {} to {} after row {}",
next_start, r.start, builder.len())));
}
next_start = r.start + recording::Duration(r.duration_90k as i64);
// TODO: check for inconsistent video sample entries.
let rel_start = if r.start < start {
(start - r.start).0 as i32
} else {
0
};
let rel_end = if r.start + recording::Duration(r.duration_90k as i64) > end {
(end - r.start).0 as i32
} else {
r.duration_90k
};
builder.append(&db, r, rel_start .. rel_end)?;
Ok(())
})?;
}
if next_start < end {
return Err(Error::new(format!(
"recording ends early: {}, not requested: {} after {} rows.",
next_start, end, builder.len())))
}
if builder.len() > est_records {
warn!("Estimated {} records for time [{}, {}); actually were {}",
est_records, start, end, builder.len());
} else {
debug!("Estimated {} records for time [{}, {}); actually were {}",
est_records, start, end, builder.len());
}
builder.include_timestamp_subtitle_track(include_ts);
let mp4 = builder.build(self.db.clone(), self.dir.clone())?;
http_entity::serve(&mp4, req, res)?;
Ok(())
@ -422,7 +522,7 @@ impl Handler {
/// Parses optional `start_time_90k` and `end_time_90k` query parameters, defaulting to the
/// full range of possible values.
fn get_optional_range(query: &str) -> Result<Range<recording::Time>> {
fn get_optional_range(query: &str) -> Result<Range<recording::Time>, Error> {
let mut start = i64::min_value();
let mut end = i64::max_value();
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
@ -455,10 +555,12 @@ impl server::Handler for Handler {
#[cfg(test)]
mod tests {
use super::{HtmlEscaped, Humanized};
use super::{HtmlEscaped, Humanized, Segments};
use testutil;
#[test]
fn test_humanize() {
testutil::init();
assert_eq!("1.0 B", format!("{:b}B", Humanized(1)));
assert_eq!("1.0 EiB", format!("{:b}B", Humanized(1i64 << 60)));
assert_eq!("1.5 EiB", format!("{:b}B", Humanized((1i64 << 60) + (1i64 << 59))));
@ -468,8 +570,30 @@ mod tests {
#[test]
fn test_html_escaped() {
testutil::init();
assert_eq!("", format!("{}", HtmlEscaped("")));
assert_eq!("no special chars", format!("{}", HtmlEscaped("no special chars")));
assert_eq!("a &lt;tag> &amp; text", format!("{}", HtmlEscaped("a <tag> & text")));
}
#[test]
fn test_segments() {
testutil::init();
assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: None},
Segments::parse("1").unwrap());
assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: None},
Segments::parse("1.26-").unwrap());
assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: Some(42)},
Segments::parse("1.-42").unwrap());
assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: Some(42)},
Segments::parse("1.26-42").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: None},
Segments::parse("1-5").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: None},
Segments::parse("1-5.26-").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: Some(42)},
Segments::parse("1-5.-42").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: Some(42)},
Segments::parse("1-5.26-42").unwrap());
}
}