// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2016-2020 The Moonfire NVR Authors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//! Database access logic for the Moonfire NVR SQLite schema.
//!
//! The SQLite schema includes everything except the actual video samples (see the `dir` module
//! for management of those). See `schema.sql` for a more detailed description.
//!
//! The `Database` struct caches data in RAM, making the assumption that only one process is
//! accessing the database at a time. Performance and efficiency notes:
//!
//! * several query operations here feature row callbacks. The callback is invoked with
//! the database lock. Thus, the callback shouldn't perform long-running operations.
//!
//! * startup may be slow, as it scans the entire index for the recording table. This seems
//! acceptable.
//!
//! * the operations used for web file serving should return results with acceptable latency.
//!
//! * however, the database lock may be held for longer than is acceptable for
//! the critical path of recording frames. The caller should preallocate sample file uuids
//! and such to avoid database operations in these paths.
//!
//! * adding and removing recordings done during normal operations use a batch interface.
//! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write
//! cycles.
use base::clock::{self, Clocks};
use base::strutil::encode_size;
use crate::auth;
use crate::dir;
use crate::raw;
use crate::recording::{self, TIME_UNITS_PER_SEC};
use crate::schema;
use crate::signal;
use failure::{Error, bail, format_err};
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use log::{error, info, trace};
use lru_cache::LruCache;
use parking_lot::{Mutex,MutexGuard};
use protobuf::prelude::MessageField;
use rusqlite::{named_params, params};
use smallvec::SmallVec;
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::convert::TryInto;
use std::cmp;
use std::fmt::Write as _;
use std::io::Write;
use std::ops::Range;
use std::mem;
use std::str;
use std::string::String;
use std::sync::Arc;
use std::vec::Vec;
use time;
use uuid::Uuid;
/// Expected schema version. See `guide/schema.md` for more information.
pub const EXPECTED_VERSION: i32 = 6;
const GET_RECORDING_PLAYBACK_SQL: &'static str = r#"
select
video_index
from
recording_playback
where
composite_id = :composite_id
"#;
const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
insert into video_sample_entry (width, height, pasp_h_spacing, pasp_v_spacing,
rfc6381_codec, data)
values (:width, :height, :pasp_h_spacing, :pasp_v_spacing,
:rfc6381_codec, :data)
"#;
const UPDATE_STREAM_COUNTERS_SQL: &'static str = r#"
update stream
set cum_recordings = :cum_recordings,
cum_media_duration_90k = :cum_media_duration_90k,
cum_runs = :cum_runs
where id = :stream_id
"#;
/// The size of a filesystem block, to use in disk space accounting.
/// This should really be obtained by a stat call on the sample file directory in question,
/// but that requires some refactoring. See
/// [#89](https://github.com/scottlamb/moonfire-nvr/issues/89). We might be able to get away with
/// this hardcoded value for a while.
const ASSUMED_BLOCK_SIZE_BYTES: i64 = 4096;
/// Rounds a file size up to the next multiple of the block size.
/// This is useful in representing the actual amount of filesystem space used.
pub(crate) fn round_up(bytes: i64) -> i64 {
let blk = ASSUMED_BLOCK_SIZE_BYTES;
(bytes + blk - 1) / blk * blk
}
pub struct FromSqlUuid(pub Uuid);
impl rusqlite::types::FromSql for FromSqlUuid {
fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult {
let uuid = Uuid::from_slice(value.as_blob()?)
.map_err(|e| rusqlite::types::FromSqlError::Other(Box::new(e)))?;
Ok(FromSqlUuid(uuid))
}
}
struct VideoIndex(Box<[u8]>);
impl rusqlite::types::FromSql for VideoIndex {
fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult {
Ok(VideoIndex(value.as_blob()?.to_vec().into_boxed_slice()))
}
}
/// A concrete box derived from a ISO/IEC 14496-12 section 8.5.2 VisualSampleEntry box. Describes
/// the codec, width, height, etc.
#[derive(Debug)]
pub struct VideoSampleEntry {
pub id: i32,
// Fields matching VideoSampleEntryToInsert below.
pub data: Vec,
pub rfc6381_codec: String,
pub width: u16,
pub height: u16,
pub pasp_h_spacing: u16,
pub pasp_v_spacing: u16,
}
#[derive(Debug, PartialEq, Eq)]
pub struct VideoSampleEntryToInsert {
pub data: Vec,
pub rfc6381_codec: String,
pub width: u16,
pub height: u16,
pub pasp_h_spacing: u16,
pub pasp_v_spacing: u16,
}
/// A row used in `list_recordings_by_time` and `list_recordings_by_id`.
#[derive(Copy, Clone, Debug)]
pub struct ListRecordingsRow {
pub start: recording::Time,
pub video_sample_entry_id: i32,
pub id: CompositeId,
/// This is a recording::Duration, but a single recording's duration fits into an i32.
pub wall_duration_90k: i32,
pub media_duration_90k: i32,
pub video_samples: i32,
pub video_sync_samples: i32,
pub sample_file_bytes: i32,
pub run_offset: i32,
pub open_id: u32,
pub flags: i32,
/// This is populated by `list_recordings_by_id` but not `list_recordings_by_time`.
/// (It's not included in the `recording_cover` index, so adding it to
/// `list_recordings_by_time` would be inefficient.)
pub prev_media_duration_and_runs: Option<(recording::Duration, i32)>,
}
/// A row used in `list_aggregated_recordings`.
#[derive(Clone, Debug)]
pub struct ListAggregatedRecordingsRow {
pub time: Range,
pub ids: Range,
pub video_samples: i64,
pub video_sync_samples: i64,
pub sample_file_bytes: i64,
pub video_sample_entry_id: i32,
pub stream_id: i32,
pub run_start_id: i32,
pub open_id: u32,
pub first_uncommitted: Option,
pub growing: bool,
}
impl ListAggregatedRecordingsRow { fn from(row: ListRecordingsRow) -> Self {
let recording_id = row.id.recording();
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
ListAggregatedRecordingsRow {
time: row.start .. recording::Time(row.start.0 + row.wall_duration_90k as i64),
ids: recording_id .. recording_id+1,
video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64,
sample_file_bytes: row.sample_file_bytes as i64,
video_sample_entry_id: row.video_sample_entry_id,
stream_id: row.id.stream(),
run_start_id: recording_id - row.run_offset,
open_id: row.open_id,
first_uncommitted: if uncommitted { Some(recording_id) } else { None },
growing,
}
}
}
/// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`.
#[derive(Debug)]
pub struct RecordingPlayback<'a> {
pub video_index: &'a [u8],
}
/// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`.
pub enum RecordingFlags {
TrailingZero = 1,
// These values (starting from high bit on down) are never written to the database.
Growing = 1 << 30,
Uncommitted = 1 << 31,
}
/// A recording to pass to `LockedDatabase::add_recording` and `raw::insert_recording`.
#[derive(Clone, Debug, Default)]
pub struct RecordingToInsert {
pub run_offset: i32,
pub flags: i32,
pub sample_file_bytes: i32,
pub start: recording::Time,
/// Filled in by `add_recording`.
pub prev_media_duration: recording::Duration,
/// Filled in by `add_recording`.
pub prev_runs: i32,
pub wall_duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32.
pub media_duration_90k: i32,
pub local_time_delta: recording::Duration,
pub video_samples: i32,
pub video_sync_samples: i32,
pub video_sample_entry_id: i32,
pub video_index: Vec,
pub sample_file_blake3: Option<[u8; 32]>,
}
impl RecordingToInsert {
fn to_list_row(&self, id: CompositeId, open_id: u32) -> ListRecordingsRow {
ListRecordingsRow {
start: self.start,
video_sample_entry_id: self.video_sample_entry_id,
id,
wall_duration_90k: self.wall_duration_90k,
media_duration_90k: self.media_duration_90k,
video_samples: self.video_samples,
video_sync_samples: self.video_sync_samples,
sample_file_bytes: self.sample_file_bytes,
run_offset: self.run_offset,
open_id,
flags: self.flags | RecordingFlags::Uncommitted as i32,
prev_media_duration_and_runs: Some((self.prev_media_duration, self.prev_runs)),
}
}
}
/// A row used in `raw::list_oldest_recordings` and `db::delete_oldest_recordings`.
#[derive(Copy, Clone, Debug)]
pub(crate) struct ListOldestRecordingsRow {
pub id: CompositeId,
pub start: recording::Time,
pub wall_duration_90k: i32,
pub sample_file_bytes: i32,
}
/// A calendar day in `YYYY-mm-dd` format.
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct StreamDayKey([u8; 10]);
impl StreamDayKey {
fn new(tm: time::Tm) -> Result {
let mut s = StreamDayKey([0u8; 10]);
write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?;
Ok(s)
}
pub fn bounds(&self) -> Range {
let mut my_tm = time::strptime(self.as_ref(), "%Y-%m-%d").expect("days must be parseable");
my_tm.tm_utcoff = 1; // to the time crate, values != 0 mean local time.
my_tm.tm_isdst = -1;
let start = recording::Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let end = recording::Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
start .. end
}
}
impl AsRef for StreamDayKey {
fn as_ref(&self) -> &str { str::from_utf8(&self.0[..]).expect("days are always UTF-8") }
}
/// In-memory state about a particular camera on a particular day.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct StreamDayValue {
/// The number of recordings that overlap with this day. Note that `adjust_day` automatically
/// prunes days with 0 recordings.
pub recordings: i64,
/// The total duration recorded on this day. This can be 0; because frames' durations are taken
/// from the time of the next frame, a recording that ends unexpectedly after a single frame
/// will have 0 duration of that frame and thus the whole recording.
pub duration: recording::Duration,
}
#[derive(Debug)]
pub struct SampleFileDir {
pub id: i32,
pub path: String,
pub uuid: Uuid,
dir: Option>,
last_complete_open: Option,
/// ids which are in the `garbage` database table (rather than `recording`) as of last commit
/// but may still exist on disk. These can't be safely removed from the database yet.
pub(crate) garbage_needs_unlink: FnvHashSet,
/// ids which are in the `garbage` database table and are guaranteed to no longer exist on
/// disk (have been unlinked and the dir has been synced). These may be removed from the
/// database on next flush. Mutually exclusive with `garbage_needs_unlink`.
pub(crate) garbage_unlinked: Vec,
}
impl SampleFileDir {
/// Returns a cloned copy of the directory, or Err if closed.
///
/// Use `LockedDatabase::open_sample_file_dirs` prior to calling this method.
pub fn get(&self) -> Result, Error> {
Ok(self.dir
.as_ref()
.ok_or_else(|| format_err!("sample file dir {} is closed", self.id))?
.clone())
}
/// Returns expected existing metadata when opening this directory.
fn meta(&self, db_uuid: &Uuid) -> schema::DirMeta {
let mut meta = schema::DirMeta::default();
meta.db_uuid.extend_from_slice(&db_uuid.as_bytes()[..]);
meta.dir_uuid.extend_from_slice(&self.uuid.as_bytes()[..]);
if let Some(o) = self.last_complete_open {
let open = meta.last_complete_open.mut_message();
open.id = o.id;
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
}
meta
}
}
pub use crate::auth::Request;
pub use crate::auth::RawSessionId;
pub use crate::auth::Session;
pub use crate::auth::User;
pub use crate::auth::UserChange;
/// In-memory state about a camera.
#[derive(Debug)]
pub struct Camera {
pub id: i32,
pub uuid: Uuid,
pub short_name: String,
pub description: String,
pub onvif_host: String,
pub username: String,
pub password: String,
pub streams: [Option; 2],
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StreamType { MAIN, SUB }
impl StreamType {
pub fn from_index(i: usize) -> Option {
match i {
0 => Some(StreamType::MAIN),
1 => Some(StreamType::SUB),
_ => None,
}
}
pub fn index(self) -> usize {
match self {
StreamType::MAIN => 0,
StreamType::SUB => 1,
}
}
pub fn as_str(self) -> &'static str {
match self {
StreamType::MAIN => "main",
StreamType::SUB => "sub",
}
}
pub fn parse(type_: &str) -> Option {
match type_ {
"main" => Some(StreamType::MAIN),
"sub" => Some(StreamType::SUB),
_ => None,
}
}
}
impl ::std::fmt::Display for StreamType {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
f.write_str(self.as_str())
}
}
pub const ALL_STREAM_TYPES: [StreamType; 2] = [StreamType::MAIN, StreamType::SUB];
pub struct Stream {
pub id: i32,
pub camera_id: i32,
pub sample_file_dir_id: Option,
pub type_: StreamType,
pub rtsp_url: String,
pub retain_bytes: i64,
pub flush_if_sec: i64,
/// The time range of recorded data associated with this stream (minimum start time and maximum
/// end time). `None` iff there are no recordings for this camera.
pub range: Option>,
/// The total bytes of flushed sample files. This doesn't include disk space wasted in the
/// last filesystem block allocated to each file ("internal fragmentation").
pub sample_file_bytes: i64,
/// The total bytes on the filesystem used by this stream. This slightly more than
/// `sample_file_bytes` because it includes the wasted space in the last filesystem block.
pub fs_bytes: i64,
/// On flush, delete the following recordings (move them to the `garbage` table, to be
/// collected later). Note they must be the oldest recordings. The later collection involves
/// the syncer unlinking the files on disk and syncing the directory then enqueueing for
/// another following flush removal from the `garbage` table.
to_delete: Vec,
/// The total bytes to delete with the next flush.
pub bytes_to_delete: i64,
pub fs_bytes_to_delete: i64,
/// The total bytes to add with the next flush. (`mark_synced` has already been called on these
/// recordings.)
pub bytes_to_add: i64,
pub fs_bytes_to_add: i64,
/// The total duration of undeleted recorded data. This may not be `range.end - range.start`
/// due to gaps and overlap.
pub duration: recording::Duration,
/// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day.
pub days: BTreeMap,
pub record: bool,
/// The `cum_recordings` currently committed to the database.
pub(crate) cum_recordings: i32,
/// The `cum_media_duration_90k` currently committed to the database.
cum_media_duration: recording::Duration,
/// The `cum_runs` currently committed to the database.
cum_runs: i32,
/// The recordings which have been added via `LockedDatabase::add_recording` but have yet to
/// committed to the database.
///
/// `uncommitted[i]` uses sample filename `CompositeId::new(id, cum_recordings + i)`;
/// `cum_recordings` should be advanced when one is committed to maintain this invariant.
///
/// TODO: alter the serving path to show these just as if they were already committed.
uncommitted: VecDeque>>,
/// The number of recordings in `uncommitted` which are synced and ready to commit.
synced_recordings: usize,
on_live_segment: Vec bool + Send>>,
}
/// Bounds of a single keyframe and the frames dependent on it.
/// This is used for live stream recordings. The stream id should already be known to the
/// subscriber.
#[derive(Clone, Debug)]
pub struct LiveSegment {
pub recording: i32,
/// The pts, relative to the start of the recording, of the start and end of this live segment,
/// in 90kHz units.
pub off_90k: Range,
}
#[derive(Clone, Debug, Default)]
pub struct StreamChange {
pub sample_file_dir_id: Option,
pub rtsp_url: String,
pub record: bool,
pub flush_if_sec: i64,
}
/// Information about a camera, used by `add_camera` and `update_camera`.
#[derive(Clone, Debug)]
pub struct CameraChange {
pub short_name: String,
pub description: String,
pub onvif_host: String,
pub username: String,
pub password: String,
/// `StreamType t` is represented by `streams[t.index()]`. A default StreamChange will
/// correspond to no stream in the database, provided there are no existing recordings for that
/// stream.
pub streams: [StreamChange; 2],
}
/// Adds non-zero `delta` to the day represented by `day` in the map `m`.
/// Inserts a map entry if absent; removes the entry if it has 0 entries on exit.
fn adjust_day(day: StreamDayKey, delta: StreamDayValue,
m: &mut BTreeMap) {
use ::std::collections::btree_map::Entry;
match m.entry(day) {
Entry::Vacant(e) => { e.insert(delta); },
Entry::Occupied(mut e) => {
let v = e.get_mut();
v.recordings += delta.recordings;
v.duration += delta.duration;
if v.recordings == 0 {
e.remove_entry();
}
},
}
}
/// Adjusts the day map `m` to reflect the range of the given recording.
/// Note that the specified range may span two days. It will never span more because the maximum
/// length of a recording entry is less than a day (even a 23-hour "spring forward" day).
///
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
/// not much that can be done about them. (The database operation has already gone through.)
fn adjust_days(r: Range, sign: i64,
m: &mut BTreeMap) {
// Find first day key.
let mut my_tm = time::at(time::Timespec{sec: r.start.unix_seconds(), nsec: 0});
let day = match StreamDayKey::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!("Unable to fill first day key from {:?}: {}; will ignore.", my_tm, e);
return;
}
};
// Determine the start of the next day.
// Use mytm to hold a non-normalized representation of the boundary.
my_tm.tm_isdst = -1;
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let boundary = my_tm.to_timespec();
let boundary_90k = boundary.sec * TIME_UNITS_PER_SEC;
// Adjust the first day.
let first_day_delta = StreamDayValue {
recordings: sign,
duration: recording::Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)),
};
adjust_day(day, first_day_delta, m);
if r.end.0 <= boundary_90k {
return;
}
// Fill day with the second day. This requires a normalized representation so recalculate.
// (The C mktime(3) already normalized for us once, but .to_timespec() discarded that result.)
let my_tm = time::at(boundary);
let day = match StreamDayKey::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!("Unable to fill second day key from {:?}: {}; will ignore.", my_tm, e);
return;
}
};
let second_day_delta = StreamDayValue {
recordings: sign,
duration: recording::Duration(sign * (r.end.0 - boundary_90k)),
};
adjust_day(day, second_day_delta, m);
}
impl Stream {
/// Adds a single fully committed recording with the given properties to the in-memory state.
fn add_recording(&mut self, r: Range, sample_file_bytes: i32) {
self.range = Some(match self.range {
Some(ref e) => cmp::min(e.start, r.start) .. cmp::max(e.end, r.end),
None => r.start .. r.end,
});
self.duration += r.end - r.start;
self.sample_file_bytes += sample_file_bytes as i64;
self.fs_bytes += round_up(i64::from(sample_file_bytes));
adjust_days(r, 1, &mut self.days);
}
}
/// Initializes the recordings associated with the given camera.
fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Camera,
stream: &mut Stream)
-> Result<(), Error> {
info!("Loading recordings for camera {} stream {:?}", camera.short_name, stream.type_);
let mut stmt = conn.prepare(r#"
select
recording.start_time_90k,
recording.wall_duration_90k,
recording.sample_file_bytes
from
recording
where
stream_id = :stream_id
"#)?;
let mut rows = stmt.query_named(named_params!{":stream_id": stream_id})?;
let mut i = 0;
while let Some(row) = rows.next()? {
let start = recording::Time(row.get(0)?);
let duration = recording::Duration(row.get(1)?);
let bytes = row.get(2)?;
stream.add_recording(start .. start + duration, bytes);
i += 1;
}
info!("Loaded {} recordings for camera {} stream {:?}", i, camera.short_name, stream.type_);
Ok(())
}
pub struct LockedDatabase {
conn: rusqlite::Connection,
uuid: Uuid,
flush_count: usize,
/// If the database is open in read-write mode, the information about the current Open row.
pub open: Option,
/// The monotonic time when the database was opened (whether in read-write mode or read-only
/// mode).
open_monotonic: recording::Time,
auth: auth::State,
signal: signal::State,
sample_file_dirs_by_id: BTreeMap,
cameras_by_id: BTreeMap,
streams_by_id: BTreeMap,
cameras_by_uuid: BTreeMap, // values are ids.
video_sample_entries_by_id: BTreeMap>,
video_index_cache: RefCell, fnv::FnvBuildHasher>>,
on_flush: Vec>,
}
/// Represents a row of the `open` database table.
#[derive(Copy, Clone, Debug)]
pub struct Open {
pub id: u32,
pub(crate) uuid: Uuid,
}
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct CompositeId(pub i64);
impl CompositeId {
pub fn new(stream_id: i32, recording_id: i32) -> Self {
CompositeId((stream_id as i64) << 32 | recording_id as i64)
}
pub fn stream(self) -> i32 { (self.0 >> 32) as i32 }
pub fn recording(self) -> i32 { self.0 as i32 }
}
impl ::std::fmt::Display for CompositeId {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
write!(f, "{}/{}", self.stream(), self.recording())
}
}
/// Inserts, updates, or removes streams in the `State` object to match a set of `StreamChange`
/// structs.
struct StreamStateChanger {
sids: [Option; 2],
streams: Vec<(i32, Option<(i32, StreamType, StreamChange)>)>,
}
impl StreamStateChanger {
/// Performs the database updates (guarded by the given transaction) and returns the state
/// change to be applied on successful commit.
fn new(tx: &rusqlite::Transaction, camera_id: i32, existing: Option<&Camera>,
streams_by_id: &BTreeMap, change: &mut CameraChange)
-> Result {
let mut sids = [None; 2];
let mut streams = Vec::with_capacity(2);
let existing_streams = existing.map(|e| e.streams).unwrap_or_default();
for (i, ref mut sc) in change.streams.iter_mut().enumerate() {
let type_ = StreamType::from_index(i).unwrap();
let mut have_data = false;
if let Some(sid) = existing_streams[i] {
let s = streams_by_id.get(&sid).unwrap();
if s.range.is_some() {
have_data = true;
if let (Some(d), false) = (s.sample_file_dir_id,
s.sample_file_dir_id == sc.sample_file_dir_id) {
bail!("can't change sample_file_dir_id {:?}->{:?} for non-empty stream {}",
d, sc.sample_file_dir_id, sid);
}
}
if !have_data && sc.rtsp_url.is_empty() && sc.sample_file_dir_id.is_none() &&
!sc.record {
// Delete stream.
let mut stmt = tx.prepare_cached(r#"
delete from stream where id = ?
"#)?;
if stmt.execute(params![sid])? != 1 {
bail!("missing stream {}", sid);
}
streams.push((sid, None));
} else {
// Update stream.
let mut stmt = tx.prepare_cached(r#"
update stream set
rtsp_url = :rtsp_url,
record = :record,
flush_if_sec = :flush_if_sec,
sample_file_dir_id = :sample_file_dir_id
where
id = :id
"#)?;
let rows = stmt.execute_named(named_params!{
":rtsp_url": &sc.rtsp_url,
":record": sc.record,
":flush_if_sec": sc.flush_if_sec,
":sample_file_dir_id": sc.sample_file_dir_id,
":id": sid,
})?;
if rows != 1 {
bail!("missing stream {}", sid);
}
sids[i] = Some(sid);
let sc = mem::replace(*sc, StreamChange::default());
streams.push((sid, Some((camera_id, type_, sc))));
}
} else {
if sc.rtsp_url.is_empty() && sc.sample_file_dir_id.is_none() && !sc.record {
// Do nothing; there is no record and we want to keep it that way.
continue;
}
// Insert stream.
let mut stmt = tx.prepare_cached(r#"
insert into stream (camera_id, sample_file_dir_id, type, rtsp_url, record,
retain_bytes, flush_if_sec, cum_recordings,
cum_media_duration_90k, cum_runs)
values (:camera_id, :sample_file_dir_id, :type, :rtsp_url, :record,
0, :flush_if_sec, 0,
0, 0)
"#)?;
stmt.execute_named(named_params!{
":camera_id": camera_id,
":sample_file_dir_id": sc.sample_file_dir_id,
":type": type_.as_str(),
":rtsp_url": &sc.rtsp_url,
":record": sc.record,
":flush_if_sec": sc.flush_if_sec,
})?;
let id = tx.last_insert_rowid() as i32;
sids[i] = Some(id);
let sc = mem::replace(*sc, StreamChange::default());
streams.push((id, Some((camera_id, type_, sc))));
}
}
Ok(StreamStateChanger {
sids,
streams,
})
}
/// Applies the change to the given `streams_by_id`. The caller is expected to set
/// `Camera::streams` to the return value.
fn apply(mut self, streams_by_id: &mut BTreeMap) -> [Option; 2] {
for (id, stream) in self.streams.drain(..) {
use ::std::collections::btree_map::Entry;
match (streams_by_id.entry(id), stream) {
(Entry::Vacant(e), Some((camera_id, type_, mut sc))) => {
e.insert(Stream {
id,
type_,
camera_id,
sample_file_dir_id: sc.sample_file_dir_id,
rtsp_url: mem::replace(&mut sc.rtsp_url, String::new()),
retain_bytes: 0,
flush_if_sec: sc.flush_if_sec,
range: None,
sample_file_bytes: 0,
fs_bytes: 0,
to_delete: Vec::new(),
bytes_to_delete: 0,
fs_bytes_to_delete: 0,
bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
record: sc.record,
cum_recordings: 0,
cum_media_duration: recording::Duration(0),
cum_runs: 0,
uncommitted: VecDeque::new(),
synced_recordings: 0,
on_live_segment: Vec::new(),
});
},
(Entry::Vacant(_), None) => {},
(Entry::Occupied(e), Some((_, _, sc))) => {
let e = e.into_mut();
e.sample_file_dir_id = sc.sample_file_dir_id;
e.rtsp_url = sc.rtsp_url;
e.record = sc.record;
e.flush_if_sec = sc.flush_if_sec;
},
(Entry::Occupied(e), None) => { e.remove(); },
};
}
self.sids
}
}
/// A retention change as expected by `LockedDatabase::update_retention`.
pub struct RetentionChange {
pub stream_id: i32,
pub new_record: bool,
pub new_limit: i64,
}
impl LockedDatabase {
/// Returns an immutable view of the cameras by id.
pub fn cameras_by_id(&self) -> &BTreeMap { &self.cameras_by_id }
pub fn sample_file_dirs_by_id(&self) -> &BTreeMap {
&self.sample_file_dirs_by_id
}
/// Returns the number of completed database flushes since startup.
pub fn flushes(&self) -> usize { self.flush_count }
/// Adds a placeholder for an uncommitted recording.
///
/// The caller should write samples and fill the returned `RecordingToInsert` as it goes
/// (noting that while holding the lock, it should not perform I/O or acquire the database
/// lock). Then it should sync to permanent storage and call `mark_synced`. The data will
/// be written to the database on the next `flush`.
///
/// A call to `add_recording` is also a promise that previous recordings (even if not yet
/// synced and committed) won't change.
///
/// This fills the `prev_media_duration` and `prev_runs` fields.
pub(crate) fn add_recording(&mut self, stream_id: i32, mut r: RecordingToInsert)
-> Result<(CompositeId, Arc>), Error> {
let stream = match self.streams_by_id.get_mut(&stream_id) {
None => bail!("no such stream {}", stream_id),
Some(s) => s,
};
let id = CompositeId::new(stream_id,
stream.cum_recordings + (stream.uncommitted.len() as i32));
match stream.uncommitted.back() {
Some(s) => {
let l = s.lock();
r.prev_media_duration =
l.prev_media_duration + recording::Duration(l.wall_duration_90k.into());
r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 };
},
None => {
r.prev_media_duration = stream.cum_media_duration;
r.prev_runs = stream.cum_runs;
},
};
let recording = Arc::new(Mutex::new(r));
stream.uncommitted.push_back(Arc::clone(&recording));
Ok((id, recording))
}
/// Marks the given uncomitted recording as synced and ready to flush.
/// This must be the next unsynced recording.
pub(crate) fn mark_synced(&mut self, id: CompositeId) -> Result<(), Error> {
let stream = match self.streams_by_id.get_mut(&id.stream()) {
None => bail!("no stream for recording {}", id),
Some(s) => s,
};
let next_unsynced = stream.cum_recordings + (stream.synced_recordings as i32);
if id.recording() != next_unsynced {
bail!("can't sync {} when next unsynced recording is {} (next unflushed is {})",
id, next_unsynced, stream.cum_recordings);
}
if stream.synced_recordings == stream.uncommitted.len() {
bail!("can't sync un-added recording {}", id);
}
let l = stream.uncommitted[stream.synced_recordings].lock();
let bytes = i64::from(l.sample_file_bytes);
stream.bytes_to_add += bytes;
stream.fs_bytes_to_add += round_up(bytes);
stream.synced_recordings += 1;
Ok(())
}
pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec)
-> Result<(), Error> {
let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) {
None => bail!("no such dir {}", dir_id),
Some(d) => d,
};
dir.garbage_unlinked.reserve(ids.len());
ids.retain(|id| {
if !dir.garbage_needs_unlink.remove(id) {
return true;
}
dir.garbage_unlinked.push(*id);
false
});
if !ids.is_empty() {
bail!("delete_garbage with non-garbage ids {:?}", &ids[..]);
}
Ok(())
}
/// Registers a callback to run on every live segment immediately after it's recorded.
/// The callback is run with the database lock held, so it must not call back into the database
/// or block. The callback should return false to unregister.
pub fn watch_live(&mut self, stream_id: i32, cb: Box bool + Send>)
-> Result<(), Error> {
let s = match self.streams_by_id.get_mut(&stream_id) {
None => bail!("no such stream {}", stream_id),
Some(s) => s,
};
s.on_live_segment.push(cb);
Ok(())
}
/// Clears all watches on all streams.
/// Normally watches are self-cleaning: when a segment is sent, the callback returns false if
/// it is no longer interested (typically because hyper has just noticed the client is no
/// longer connected). This doesn't work when the system is shutting down and nothing more is
/// sent, though.
pub fn clear_watches(&mut self) {
for (_, s) in &mut self.streams_by_id {
s.on_live_segment.clear();
}
}
pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveSegment) -> Result<(), Error> {
let s = match self.streams_by_id.get_mut(&stream) {
None => bail!("no such stream {}", stream),
Some(s) => s,
};
use odds::vec::VecExt;
s.on_live_segment.retain_mut(|cb| cb(l.clone()));
Ok(())
}
/// Helper for `DatabaseGuard::flush()` and `Database::drop()`.
///
/// The public API is in `DatabaseGuard::flush()`; it supplies the `Clocks` to this function.
fn flush(&mut self, clocks: &C, reason: &str) -> Result<(), Error> {
let o = match self.open.as_ref() {
None => bail!("database is read-only"),
Some(o) => o,
};
let tx = self.conn.transaction()?;
let mut new_ranges = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(),
Default::default());
{
let mut stmt = tx.prepare_cached(UPDATE_STREAM_COUNTERS_SQL)?;
for (&stream_id, s) in &self.streams_by_id {
// Process additions.
let mut new_duration = 0;
let mut new_runs = 0;
for i in 0..s.synced_recordings {
let l = s.uncommitted[i].lock();
raw::insert_recording(
&tx, o, CompositeId::new(stream_id, s.cum_recordings + i as i32), &l)?;
new_duration += i64::from(l.wall_duration_90k);
new_runs += if l.run_offset == 0 { 1 } else { 0 };
}
if s.synced_recordings > 0 {
new_ranges.entry(stream_id).or_insert(None);
stmt.execute_named(named_params!{
":stream_id": stream_id,
":cum_recordings": s.cum_recordings + s.synced_recordings as i32,
":cum_media_duration_90k": s.cum_media_duration.0 + new_duration,
":cum_runs": s.cum_runs + new_runs,
})?;
}
// Process deletions.
if let Some(l) = s.to_delete.last() {
new_ranges.entry(stream_id).or_insert(None);
let dir = match s.sample_file_dir_id {
None => bail!("stream {} has no directory!", stream_id),
Some(d) => d,
};
// raw::delete_recordings does a bulk transfer of a range from recording to
// garbage, rather than operating on each element of to_delete. This is
// guaranteed to give the same result because to_delete is guaranteed to be the
// oldest recordings for the stream.
let start = CompositeId::new(stream_id, 0);
let end = CompositeId(l.id.0 + 1);
let n = raw::delete_recordings(&tx, dir, start .. end)? as usize;
if n != s.to_delete.len() {
bail!("Found {} rows in {} .. {}, expected {}: {:?}",
n, start, end, s.to_delete.len(), &s.to_delete);
}
}
}
}
for dir in self.sample_file_dirs_by_id.values() {
raw::mark_sample_files_deleted(&tx, &dir.garbage_unlinked)?;
}
for (&stream_id, r) in &mut new_ranges {
*r = raw::get_range(&tx, stream_id)?;
}
{
let mut stmt = tx.prepare_cached(
r"update open set duration_90k = ?, end_time_90k = ? where id = ?")?;
let rows = stmt.execute(params![
(recording::Time::new(clocks.monotonic()) - self.open_monotonic).0,
recording::Time::new(clocks.realtime()).0,
o.id,
])?;
if rows != 1 {
bail!("unable to find current open {}", o.id);
}
}
self.auth.flush(&tx)?;
self.signal.flush(&tx)?;
tx.commit()?;
#[derive(Default)]
struct DirLog {
added: SmallVec::<[CompositeId; 32]>,
deleted: SmallVec::<[CompositeId; 32]>,
gced: SmallVec::<[CompositeId; 32]>,
added_bytes: i64,
deleted_bytes: i64,
}
let mut dir_logs: FnvHashMap = FnvHashMap::default();
// Process delete_garbage.
for (&id, dir) in &mut self.sample_file_dirs_by_id {
if !dir.garbage_unlinked.is_empty() {
dir_logs.entry(id).or_default().gced.extend(dir.garbage_unlinked.drain(..));
}
}
for (stream_id, new_range) in new_ranges.drain() {
let s = self.streams_by_id.get_mut(&stream_id).unwrap();
let dir_id = s.sample_file_dir_id.unwrap();
let dir = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap();
let log = dir_logs.entry(dir_id).or_default();
// Process delete_oldest_recordings.
s.sample_file_bytes -= s.bytes_to_delete;
s.fs_bytes -= s.fs_bytes_to_delete;
log.deleted_bytes += s.bytes_to_delete;
s.bytes_to_delete = 0;
s.fs_bytes_to_delete = 0;
log.deleted.reserve(s.to_delete.len());
for row in s.to_delete.drain(..) {
log.deleted.push(row.id);
dir.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(i64::from(row.wall_duration_90k));
s.duration -= d;
adjust_days(row.start .. row.start + d, -1, &mut s.days);
}
// Process add_recordings.
log.added_bytes += s.bytes_to_add;
s.bytes_to_add = 0;
s.fs_bytes_to_add = 0;
log.added.reserve(s.synced_recordings);
for _ in 0..s.synced_recordings {
let u = s.uncommitted.pop_front().unwrap();
log.added.push(CompositeId::new(stream_id, s.cum_recordings));
let l = u.lock();
s.cum_recordings += 1;
let wall_dur = recording::Duration(l.wall_duration_90k.into());
let media_dur = recording::Duration(l.media_duration_90k.into());
s.cum_media_duration += media_dur;
s.cum_runs += if l.run_offset == 0 { 1 } else { 0 };
let end = l.start + wall_dur;
s.add_recording(l.start .. end, l.sample_file_bytes);
}
s.synced_recordings = 0;
// Fix the range.
s.range = new_range;
}
self.auth.post_flush();
self.signal.post_flush();
self.flush_count += 1;
let mut log_msg = String::with_capacity(256);
for (&dir_id, log) in &dir_logs {
let dir = self.sample_file_dirs_by_id.get(&dir_id).unwrap();
write!(&mut log_msg,
"\n{}: added {}B in {} recordings ({}), deleted {}B in {} ({}), \
GCed {} recordings ({}).",
&dir.path, &encode_size(log.added_bytes), log.added.len(),
log.added.iter().join(", "), &encode_size(log.deleted_bytes), log.deleted.len(),
log.deleted.iter().join(", "), log.gced.len(),
log.gced.iter().join(", ")).unwrap();
}
if log_msg.is_empty() {
log_msg.push_str(" no recording changes");
}
info!("Flush {} (why: {}):{}", self.flush_count, reason, &log_msg);
for cb in &self.on_flush {
cb();
}
Ok(())
}
/// Sets a watcher which will receive an (empty) event on successful flush.
/// The lock will be held while this is run, so it should not do any I/O.
pub(crate) fn on_flush(&mut self, run: Box) {
self.on_flush.push(run);
}
// TODO: find a cleaner way to do this. Seems weird for src/cmds/run.rs to clear the on flush
// handlers given that it didn't add them.
pub fn clear_on_flush(&mut self) {
self.on_flush.clear();
}
/// Opens the given sample file directories.
///
/// `ids` is implicitly de-duplicated.
///
/// When the database is in read-only mode, this simply opens all the directories after
/// locking and verifying their metadata matches the database state. In read-write mode, it
/// performs a single database transaction to update metadata for all dirs, then performs a like
/// update to the directories' on-disk metadata.
///
/// Note this violates the principle of never accessing disk while holding the database lock.
/// Currently this only happens at startup (or during configuration), so this isn't a problem
/// in practice.
pub fn open_sample_file_dirs(&mut self, ids: &[i32]) -> Result<(), Error> {
let mut in_progress = FnvHashMap::with_capacity_and_hasher(ids.len(), Default::default());
for &id in ids {
let e = in_progress.entry(id);
use ::std::collections::hash_map::Entry;
let e = match e {
Entry::Occupied(_) => continue, // suppress duplicate.
Entry::Vacant(e) => e,
};
let dir = self.sample_file_dirs_by_id
.get_mut(&id)
.ok_or_else(|| format_err!("no such dir {}", id))?;
if dir.dir.is_some() { continue }
let mut meta = dir.meta(&self.uuid);
if let Some(o) = self.open.as_ref() {
let open = meta.in_progress_open.mut_message();
open.id = o.id;
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
}
let d = dir::SampleFileDir::open(&dir.path, &meta)?;
if self.open.is_none() { // read-only mode; it's already fully opened.
dir.dir = Some(d);
} else { // read-write mode; there are more steps to do.
e.insert((meta, d));
}
}
let o = match self.open.as_ref() {
None => return Ok(()), // read-only mode; all done.
Some(o) => o,
};
let tx = self.conn.transaction()?;
{
let mut stmt = tx.prepare_cached(r#"
update sample_file_dir set last_complete_open_id = ? where id = ?
"#)?;
for &id in in_progress.keys() {
if stmt.execute(params![o.id, id])? != 1 {
bail!("unable to update dir {}", id);
}
}
}
tx.commit()?;
for (id, (mut meta, d)) in in_progress.drain() {
let dir = self.sample_file_dirs_by_id.get_mut(&id).unwrap();
meta.last_complete_open.clear();
mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open);
d.write_meta(&meta)?;
dir.dir = Some(d);
}
Ok(())
}
pub fn streams_by_id(&self) -> &BTreeMap { &self.streams_by_id }
/// Returns an immutable view of the video sample entries.
pub fn video_sample_entries_by_id(&self) -> &BTreeMap> {
&self.video_sample_entries_by_id
}
/// Gets a given camera by uuid.
pub fn get_camera(&self, uuid: Uuid) -> Option<&Camera> {
match self.cameras_by_uuid.get(&uuid) {
Some(id) => Some(self.cameras_by_id.get(id).expect("uuid->id requires id->cam")),
None => None,
}
}
/// Lists the specified recordings, passing them to a supplied function. Given that the
/// function is called with the database lock held, it should be quick.
///
/// Note that at present, the returned recordings are _not_ completely ordered by start time.
/// Uncommitted recordings are returned id order after the others.
pub fn list_recordings_by_time(
&self, stream_id: i32, desired_time: Range,
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> {
let s = match self.streams_by_id.get(&stream_id) {
None => bail!("no such stream {}", stream_id),
Some(s) => s,
};
raw::list_recordings_by_time(&self.conn, stream_id, desired_time.clone(), f)?;
for (i, u) in s.uncommitted.iter().enumerate() {
let row = {
let l = u.lock();
if l.video_samples > 0 {
let end = l.start + recording::Duration(l.wall_duration_90k as i64);
if l.start > desired_time.end || end < desired_time.start {
continue; // there's no overlap with the requested range.
}
l.to_list_row(CompositeId::new(stream_id, s.cum_recordings + i as i32),
self.open.unwrap().id)
} else {
continue;
}
};
f(row)?;
}
Ok(())
}
/// Lists the specified recordings in ascending order by id.
pub fn list_recordings_by_id(
&self, stream_id: i32, desired_ids: Range,
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> {
let s = match self.streams_by_id.get(&stream_id) {
None => bail!("no such stream {}", stream_id),
Some(s) => s,
};
if desired_ids.start < s.cum_recordings {
raw::list_recordings_by_id(&self.conn, stream_id, desired_ids.clone(), f)?;
}
if desired_ids.end > s.cum_recordings {
let start = cmp::max(0, desired_ids.start - s.cum_recordings) as usize;
let end = cmp::min((desired_ids.end - s.cum_recordings) as usize,
s.uncommitted.len());
for i in start .. end {
let row = {
let l = s.uncommitted[i].lock();
if l.video_samples > 0 {
l.to_list_row(CompositeId::new(stream_id, s.cum_recordings + i as i32),
self.open.unwrap().id)
} else {
continue;
}
};
f(row)?;
}
}
Ok(())
}
/// Calls `list_recordings_by_time` and aggregates consecutive recordings.
/// Rows are given to the callback in arbitrary order. Callers which care about ordering
/// should do their own sorting.
pub fn list_aggregated_recordings(
&self, stream_id: i32, desired_time: Range,
forced_split: recording::Duration,
f: &mut dyn FnMut(&ListAggregatedRecordingsRow) -> Result<(), Error>)
-> Result<(), Error> {
// Iterate, maintaining a map from a recording_id to the aggregated row for the latest
// batch of recordings from the run starting at that id. Runs can be split into multiple
// batches for a few reasons:
//
// * forced split (when exceeding a duration limit)
// * a missing id (one that was deleted out of order)
// * video_sample_entry mismatch (if the parameters changed during a RTSP session)
//
// This iteration works because in a run, the start_time+duration of recording id r
// is equal to the start_time of recording id r+1. Thus ascending times guarantees
// ascending ids within a run. (Different runs, however, can be arbitrarily interleaved if
// their timestamps overlap. Tracking all active runs prevents that interleaving from
// causing problems.) list_recordings_by_time also returns uncommitted recordings in
// ascending order by id, and after any committed recordings with lower ids.
let mut aggs: BTreeMap = BTreeMap::new();
self.list_recordings_by_time(stream_id, desired_time, &mut |row| {
let recording_id = row.id.recording();
let run_start_id = recording_id - row.run_offset;
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
use std::collections::btree_map::Entry;
match aggs.entry(run_start_id) {
Entry::Occupied(mut e) => {
let a = e.get_mut();
let new_dur = a.time.end - a.time.start +
recording::Duration(row.wall_duration_90k as i64);
let needs_flush =
a.ids.end != recording_id ||
row.video_sample_entry_id != a.video_sample_entry_id ||
new_dur >= forced_split;
if needs_flush { // flush then start a new entry.
f(a)?;
*a = ListAggregatedRecordingsRow::from(row);
} else { // append.
if a.time.end != row.start {
bail!("stream {} recording {} ends at {} but {} starts at {}",
stream_id, a.ids.end - 1, a.time.end, row.id, row.start);
}
if a.open_id != row.open_id {
bail!("stream {} recording {} has open id {} but {} has {}",
stream_id, a.ids.end - 1, a.open_id, row.id, row.open_id);
}
a.time.end.0 += row.wall_duration_90k as i64;
a.ids.end = recording_id + 1;
a.video_samples += row.video_samples as i64;
a.video_sync_samples += row.video_sync_samples as i64;
a.sample_file_bytes += row.sample_file_bytes as i64;
if uncommitted {
a.first_uncommitted = a.first_uncommitted.or(Some(recording_id));
}
a.growing = growing;
}
},
Entry::Vacant(e) => { e.insert(ListAggregatedRecordingsRow::from(row)); },
}
Ok(())
})?;
for a in aggs.values() {
f(a)?;
}
Ok(())
}
/// Calls `f` with a single `recording_playback` row.
/// Note the lock is held for the duration of `f`.
/// This uses a LRU cache to reduce the number of retrievals from the database.
pub fn with_recording_playback(&self, id: CompositeId,
f: &mut dyn FnMut(&RecordingPlayback) -> Result)
-> Result {
// Check for uncommitted path.
let s = self.streams_by_id
.get(&id.stream())
.ok_or_else(|| format_err!("no stream for {}", id))?;
if s.cum_recordings <= id.recording() {
let i = id.recording() - s.cum_recordings;
if i as usize >= s.uncommitted.len() {
bail!("no such recording {}; latest committed is {}, latest is {}",
id, s.cum_recordings, s.cum_recordings + s.uncommitted.len() as i32);
}
let l = s.uncommitted[i as usize].lock();
return f(&RecordingPlayback { video_index: &l.video_index });
}
// Committed path.
let mut cache = self.video_index_cache.borrow_mut();
if let Some(video_index) = cache.get_mut(&id.0) {
trace!("cache hit for recording {}", id);
return f(&RecordingPlayback { video_index });
}
trace!("cache miss for recording {}", id);
let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?;
let mut rows = stmt.query_named(named_params!{":composite_id": id.0})?;
if let Some(row) = rows.next()? {
let video_index: VideoIndex = row.get(0)?;
let result = f(&RecordingPlayback { video_index: &video_index.0[..] });
cache.insert(id.0, video_index.0);
return result;
}
Err(format_err!("no such recording {}", id))
}
/// Queues for deletion the oldest recordings that aren't already queued.
/// `f` should return true for each row that should be deleted.
pub(crate) fn delete_oldest_recordings(
&mut self, stream_id: i32, f: &mut dyn FnMut(&ListOldestRecordingsRow) -> bool)
-> Result<(), Error> {
let s = match self.streams_by_id.get_mut(&stream_id) {
None => bail!("no stream {}", stream_id),
Some(s) => s,
};
let end = match s.to_delete.last() {
None => 0,
Some(row) => row.id.recording() + 1,
};
raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| {
if f(&r) {
s.to_delete.push(r);
let bytes = i64::from(r.sample_file_bytes);
s.bytes_to_delete += bytes;
s.fs_bytes_to_delete += round_up(bytes);
return true;
}
false
})
}
/// Initializes the video_sample_entries. To be called during construction.
fn init_video_sample_entries(&mut self) -> Result<(), Error> {
info!("Loading video sample entries");
let mut stmt = self.conn.prepare(r#"
select
id,
width,
height,
pasp_h_spacing,
pasp_v_spacing,
rfc6381_codec,
data
from
video_sample_entry
"#)?;
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let id = row.get(0)?;
let data: Vec = row.get(6)?;
self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry {
id,
width: row.get::<_, i32>(1)?.try_into()?,
height: row.get::<_, i32>(2)?.try_into()?,
pasp_h_spacing: row.get::<_, i32>(3)?.try_into()?,
pasp_v_spacing: row.get::<_, i32>(4)?.try_into()?,
data,
rfc6381_codec: row.get(5)?,
}));
}
info!("Loaded {} video sample entries",
self.video_sample_entries_by_id.len());
Ok(())
}
/// Initializes the sample file dirs.
/// To be called during construction.
fn init_sample_file_dirs(&mut self) -> Result<(), Error> {
info!("Loading sample file dirs");
let mut stmt = self.conn.prepare(r#"
select
d.id,
d.path,
d.uuid,
d.last_complete_open_id,
o.uuid
from
sample_file_dir d left join open o on (d.last_complete_open_id = o.id);
"#)?;
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let id = row.get(0)?;
let dir_uuid: FromSqlUuid = row.get(2)?;
let open_id: Option = row.get(3)?;
let open_uuid: Option = row.get(4)?;
let last_complete_open = match (open_id, open_uuid) {
(Some(id), Some(uuid)) => Some(Open { id, uuid: uuid.0, }),
(None, None) => None,
_ => bail!("open table missing id {}", id),
};
self.sample_file_dirs_by_id.insert(id, SampleFileDir {
id,
uuid: dir_uuid.0,
path: row.get(1)?,
dir: None,
last_complete_open,
garbage_needs_unlink: raw::list_garbage(&self.conn, id)?,
garbage_unlinked: Vec::new(),
});
}
info!("Loaded {} sample file dirs", self.sample_file_dirs_by_id.len());
Ok(())
}
/// Initializes the cameras, but not their matching recordings.
/// To be called during construction.
fn init_cameras(&mut self) -> Result<(), Error> {
info!("Loading cameras");
let mut stmt = self.conn.prepare(r#"
select
id,
uuid,
short_name,
description,
onvif_host,
username,
password
from
camera;
"#)?;
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let id = row.get(0)?;
let uuid: FromSqlUuid = row.get(1)?;
self.cameras_by_id.insert(id, Camera {
id: id,
uuid: uuid.0,
short_name: row.get(2)?,
description: row.get(3)?,
onvif_host: row.get(4)?,
username: row.get(5)?,
password: row.get(6)?,
streams: Default::default(),
});
self.cameras_by_uuid.insert(uuid.0, id);
}
info!("Loaded {} cameras", self.cameras_by_id.len());
Ok(())
}
/// Initializes the streams, but not their matching recordings.
/// To be called during construction.
fn init_streams(&mut self) -> Result<(), Error> {
info!("Loading streams");
let mut stmt = self.conn.prepare(r#"
select
id,
type,
camera_id,
sample_file_dir_id,
rtsp_url,
retain_bytes,
flush_if_sec,
cum_recordings,
cum_media_duration_90k,
cum_runs,
record
from
stream;
"#)?;
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let id = row.get(0)?;
let type_: String = row.get(1)?;
let type_ = StreamType::parse(&type_).ok_or_else(
|| format_err!("no such stream type {}", type_))?;
let camera_id = row.get(2)?;
let c = self
.cameras_by_id
.get_mut(&camera_id)
.ok_or_else(|| format_err!("missing camera {} for stream {}",
camera_id, id))?;
let flush_if_sec = row.get(6)?;
self.streams_by_id.insert(id, Stream {
id,
type_,
camera_id,
sample_file_dir_id: row.get(3)?,
rtsp_url: row.get(4)?,
retain_bytes: row.get(5)?,
flush_if_sec,
range: None,
sample_file_bytes: 0,
fs_bytes: 0,
to_delete: Vec::new(),
bytes_to_delete: 0,
fs_bytes_to_delete: 0,
bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
cum_recordings: row.get(7)?,
cum_media_duration: recording::Duration(row.get(8)?),
cum_runs: row.get(9)?,
record: row.get(10)?,
uncommitted: VecDeque::new(),
synced_recordings: 0,
on_live_segment: Vec::new(),
});
c.streams[type_.index()] = Some(id);
}
info!("Loaded {} streams", self.streams_by_id.len());
Ok(())
}
/// Inserts the specified video sample entry if absent.
/// On success, returns the id of a new or existing row.
pub fn insert_video_sample_entry(&mut self, entry: VideoSampleEntryToInsert)
-> Result {
// Check if it already exists.
// There shouldn't be too many entries, so it's fine to enumerate everything.
for (&id, v) in &self.video_sample_entries_by_id {
if v.data == entry.data {
// The other fields are derived from data, so differences indicate a bug.
if v.width != entry.width || v.height != entry.height ||
v.pasp_h_spacing != entry.pasp_h_spacing ||
v.pasp_v_spacing != entry.pasp_v_spacing {
bail!("video_sample_entry id {}: existing entry {:?}, new {:?}", id, v, &entry);
}
return Ok(id);
}
}
let mut stmt = self.conn.prepare_cached(INSERT_VIDEO_SAMPLE_ENTRY_SQL)?;
stmt.execute_named(named_params!{
":width": i32::from(entry.width),
":height": i32::from(entry.height),
":pasp_h_spacing": i32::from(entry.pasp_h_spacing),
":pasp_v_spacing": i32::from(entry.pasp_v_spacing),
":rfc6381_codec": &entry.rfc6381_codec,
":data": &entry.data,
})?;
let id = self.conn.last_insert_rowid() as i32;
self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry {
id,
width: entry.width,
height: entry.height,
pasp_h_spacing: entry.pasp_h_spacing,
pasp_v_spacing: entry.pasp_v_spacing,
data: entry.data,
rfc6381_codec: entry.rfc6381_codec,
}));
Ok(id)
}
pub fn add_sample_file_dir(&mut self, path: String) -> Result {
let mut meta = schema::DirMeta::default();
let uuid = Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
let o = self.open
.as_ref()
.ok_or_else(|| format_err!("database is read-only"))?;
// Populate meta.
{
meta.db_uuid.extend_from_slice(&self.uuid.as_bytes()[..]);
meta.dir_uuid.extend_from_slice(uuid_bytes);
let open = meta.in_progress_open.mut_message();
open.id = o.id;
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
}
let dir = dir::SampleFileDir::create(&path, &meta)?;
self.conn.execute(r#"
insert into sample_file_dir (path, uuid, last_complete_open_id)
values (?, ?, ?)
"#, params![&path, uuid_bytes, o.id])?;
let id = self.conn.last_insert_rowid() as i32;
use ::std::collections::btree_map::Entry;
let e = self.sample_file_dirs_by_id.entry(id);
let d = match e {
Entry::Vacant(e) => e.insert(SampleFileDir {
id,
path,
uuid,
dir: Some(dir),
last_complete_open: None,
garbage_needs_unlink: FnvHashSet::default(),
garbage_unlinked: Vec::new(),
}),
Entry::Occupied(_) => Err(format_err!("duplicate sample file dir id {}", id))?,
};
d.last_complete_open = Some(*o);
mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open);
d.dir.as_ref().unwrap().write_meta(&meta)?;
Ok(id)
}
pub fn delete_sample_file_dir(&mut self, dir_id: i32) -> Result<(), Error> {
for (&id, s) in self.streams_by_id.iter() {
if s.sample_file_dir_id == Some(dir_id) {
bail!("can't delete dir referenced by stream {}", id);
}
}
let mut d = match self.sample_file_dirs_by_id.entry(dir_id) {
::std::collections::btree_map::Entry::Occupied(e) => e,
_ => bail!("no such dir {} to remove", dir_id),
};
if !d.get().garbage_needs_unlink.is_empty() || !d.get().garbage_unlinked.is_empty() {
bail!("must collect garbage before deleting directory {}", d.get().path);
}
let dir = match d.get_mut().dir.take() {
None => dir::SampleFileDir::open(&d.get().path, &d.get().meta(&self.uuid))?,
Some(arc) => match Arc::strong_count(&arc) {
1 => {
d.get_mut().dir = Some(arc); // put it back.
bail!("can't delete in-use directory {}", dir_id);
},
_ => arc,
},
};
if !dir.is_empty()? {
bail!("Can't delete sample file directory {} which still has files", &d.get().path);
}
let mut meta = d.get().meta(&self.uuid);
meta.in_progress_open = mem::replace(&mut meta.last_complete_open,
::protobuf::SingularPtrField::none());
dir.write_meta(&meta)?;
if self.conn.execute("delete from sample_file_dir where id = ?", params![dir_id])? != 1 {
bail!("missing database row for dir {}", dir_id);
}
d.remove_entry();
Ok(())
}
/// Adds a camera.
pub fn add_camera(&mut self, mut camera: CameraChange) -> Result {
let uuid = Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
let tx = self.conn.transaction()?;
let streams;
let camera_id;
{
let mut stmt = tx.prepare_cached(r#"
insert into camera (uuid, short_name, description, onvif_host, username,
password)
values (:uuid, :short_name, :description, :onvif_host, :username,
:password)
"#)?;
stmt.execute_named(named_params!{
":uuid": uuid_bytes,
":short_name": &camera.short_name,
":description": &camera.description,
":onvif_host": &camera.onvif_host,
":username": &camera.username,
":password": &camera.password,
})?;
camera_id = tx.last_insert_rowid() as i32;
streams = StreamStateChanger::new(&tx, camera_id, None, &self.streams_by_id,
&mut camera)?;
}
tx.commit()?;
let streams = streams.apply(&mut self.streams_by_id);
self.cameras_by_id.insert(camera_id, Camera {
id: camera_id,
uuid,
short_name: camera.short_name,
description: camera.description,
onvif_host: camera.onvif_host,
username: camera.username,
password: camera.password,
streams,
});
self.cameras_by_uuid.insert(uuid, camera_id);
Ok(camera_id)
}
/// Updates a camera.
pub fn update_camera(&mut self, camera_id: i32, mut camera: CameraChange) -> Result<(), Error> {
let tx = self.conn.transaction()?;
let streams;
let c = self
.cameras_by_id
.get_mut(&camera_id)
.ok_or_else(|| format_err!("no such camera {}", camera_id))?;
{
streams = StreamStateChanger::new(&tx, camera_id, Some(c), &self.streams_by_id,
&mut camera)?;
let mut stmt = tx.prepare_cached(r#"
update camera set
short_name = :short_name,
description = :description,
onvif_host = :onvif_host,
username = :username,
password = :password
where
id = :id
"#)?;
let rows = stmt.execute_named(named_params!{
":id": camera_id,
":short_name": &camera.short_name,
":description": &camera.description,
":onvif_host": &camera.onvif_host,
":username": &camera.username,
":password": &camera.password,
})?;
if rows != 1 {
bail!("Camera {} missing from database", camera_id);
}
}
tx.commit()?;
c.short_name = camera.short_name;
c.description = camera.description;
c.onvif_host = camera.onvif_host;
c.username = camera.username;
c.password = camera.password;
c.streams = streams.apply(&mut self.streams_by_id);
Ok(())
}
/// Deletes a camera and its streams. The camera must have no recordings.
pub fn delete_camera(&mut self, id: i32) -> Result<(), Error> {
let uuid = self.cameras_by_id.get(&id)
.map(|c| c.uuid)
.ok_or_else(|| format_err!("No such camera {} to remove", id))?;
let mut streams_to_delete = Vec::new();
let tx = self.conn.transaction()?;
{
let mut stream_stmt = tx.prepare_cached(r"delete from stream where id = :id")?;
for (stream_id, stream) in &self.streams_by_id {
if stream.camera_id != id { continue };
if stream.range.is_some() {
bail!("Can't remove camera {}; has recordings.", id);
}
let rows = stream_stmt.execute_named(named_params!{":id": stream_id})?;
if rows != 1 {
bail!("Stream {} missing from database", id);
}
streams_to_delete.push(*stream_id);
}
let mut cam_stmt = tx.prepare_cached(r"delete from camera where id = :id")?;
let rows = cam_stmt.execute_named(named_params!{":id": id})?;
if rows != 1 {
bail!("Camera {} missing from database", id);
}
}
tx.commit()?;
for id in streams_to_delete {
self.streams_by_id.remove(&id);
}
self.cameras_by_id.remove(&id);
self.cameras_by_uuid.remove(&uuid);
return Ok(())
}
pub fn update_retention(&mut self, changes: &[RetentionChange]) -> Result<(), Error> {
let tx = self.conn.transaction()?;
{
let mut stmt = tx.prepare_cached(r#"
update stream
set
record = :record,
retain_bytes = :retain
where
id = :id
"#)?;
for c in changes {
if c.new_limit < 0 {
bail!("can't set limit for stream {} to {}; must be >= 0",
c.stream_id, c.new_limit);
}
let rows = stmt.execute_named(named_params!{
":record": c.new_record,
":retain": c.new_limit,
":id": c.stream_id,
})?;
if rows != 1 {
bail!("no such stream {}", c.stream_id);
}
}
}
tx.commit()?;
for c in changes {
let s = self.streams_by_id.get_mut(&c.stream_id).expect("stream in db but not state");
s.record = c.new_record;
s.retain_bytes = c.new_limit;
}
Ok(())
}
// ---- auth ----
pub fn users_by_id(&self) -> &BTreeMap { self.auth.users_by_id() }
pub fn apply_user_change(&mut self, change: UserChange) -> Result<&User, Error> {
self.auth.apply(&self.conn, change)
}
pub fn delete_user(&mut self, id: i32) -> Result<(), Error> {
self.auth.delete_user(&mut self.conn, id)
}
pub fn get_user(&self, username: &str) -> Option<&User> {
self.auth.get_user(username)
}
pub fn login_by_password(&mut self, req: auth::Request, username: &str, password: String,
domain: Option>, session_flags: i32)
-> Result<(RawSessionId, &Session), Error> {
self.auth.login_by_password(&self.conn, req, username, password, domain, session_flags)
}
pub fn make_session(&mut self, creation: Request, uid: i32,
domain: Option>, flags: i32, permissions: schema::Permissions)
-> Result<(RawSessionId, &Session), Error> {
self.auth.make_session(&self.conn, creation, uid, domain, flags, permissions)
}
pub fn authenticate_session(&mut self, req: auth::Request, sid: &auth::SessionHash)
-> Result<(&auth::Session, &User), Error> {
self.auth.authenticate_session(&self.conn, req, sid)
}
pub fn revoke_session(&mut self, reason: auth::RevocationReason, detail: Option,
req: auth::Request, hash: &auth::SessionHash) -> Result<(), Error> {
self.auth.revoke_session(&self.conn, reason, detail, req, hash)
}
// ---- signal ----
pub fn signals_by_id(&self) -> &BTreeMap { self.signal.signals_by_id() }
pub fn signal_types_by_uuid(&self) -> &FnvHashMap {
self.signal.types_by_uuid()
}
pub fn list_changes_by_time(
&self, desired_time: Range, f: &mut dyn FnMut(&signal::ListStateChangesRow)) {
self.signal.list_changes_by_time(desired_time, f)
}
pub fn update_signals(
&mut self, when: Range, signals: &[u32], states: &[u16])
-> Result<(), base::Error> {
self.signal.update_signals(when, signals, states)
}
}
/// Sets pragmas for full database integrity.
pub(crate) fn set_integrity_pragmas(conn: &mut rusqlite::Connection) -> Result<(), Error> {
// Enforce foreign keys. This is on by default with --features=bundled (as rusqlite
// compiles the SQLite3 amalgamation with -DSQLITE_DEFAULT_FOREIGN_KEYS=1). Ensure it's
// always on. Note that our foreign keys are immediate rather than deferred, so we have to
// be careful about the order of operations during the upgrade.
conn.execute("pragma foreign_keys = on", params![])?;
// Make the database actually durable.
conn.execute("pragma fullfsync = on", params![])?;
conn.execute("pragma synchronous = 3", params![])?;
Ok(())
}
/// Initializes a database.
/// Note this doesn't set journal options, so that it can be used on in-memory databases for
/// test code.
pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> {
set_integrity_pragmas(conn)?;
let tx = conn.transaction()?;
tx.execute_batch(include_str!("schema.sql"))?;
{
let uuid = ::uuid::Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
tx.execute("insert into meta (uuid) values (?)", params![uuid_bytes])?;
}
tx.commit()?;
Ok(())
}
/// Gets the schema version from the given database connection.
/// A fully initialized database will return `Ok(Some(version))` where `version` is an integer that
/// can be compared to `EXPECTED_VERSION`. An empty database will return `Ok(None)`. A partially
/// initialized database (in particular, one without a version row) will return some error.
pub fn get_schema_version(conn: &rusqlite::Connection) -> Result