2887 lines
105 KiB
Rust
2887 lines
105 KiB
Rust
// This file is part of Moonfire NVR, a security camera network video recorder.
|
|
// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
|
|
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
|
|
|
|
//! Database access logic for the Moonfire NVR SQLite schema.
|
|
//!
|
|
//! The SQLite schema includes everything except the actual video samples (see the `dir` module
|
|
//! for management of those). See `schema.sql` for a more detailed description.
|
|
//!
|
|
//! The [`Database`] struct caches data in RAM, making the assumption that only one process is
|
|
//! accessing the database at a time. Performance and efficiency notes:
|
|
//!
|
|
//! * several query operations here feature row callbacks. The callback is invoked with
|
|
//! the database lock. Thus, the callback shouldn't perform long-running operations.
|
|
//!
|
|
//! * startup may be slow, as it scans the entire index for the recording table. This seems
|
|
//! acceptable.
|
|
//!
|
|
//! * the operations used for web file serving should return results with acceptable latency.
|
|
//!
|
|
//! * however, the database lock may be held for longer than is acceptable for
|
|
//! the critical path of recording frames. The caller should preallocate sample file uuids
|
|
//! and such to avoid database operations in these paths.
|
|
//!
|
|
//! * adding and removing recordings done during normal operations use a batch interface.
|
|
//! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write
|
|
//! cycles.
|
|
|
|
use crate::auth;
|
|
use crate::days;
|
|
use crate::dir;
|
|
use crate::json::SampleFileDirConfig;
|
|
use crate::raw;
|
|
use crate::recording;
|
|
use crate::schema;
|
|
use crate::signal;
|
|
use base::clock::{self, Clocks};
|
|
use base::strutil::encode_size;
|
|
use base::{bail, err, Error};
|
|
// use failure::{bail, err, Error, ResultExt};
|
|
use fnv::{FnvHashMap, FnvHashSet};
|
|
use hashlink::LinkedHashMap;
|
|
use itertools::Itertools;
|
|
use rusqlite::{named_params, params};
|
|
use smallvec::SmallVec;
|
|
use std::cell::RefCell;
|
|
use std::cmp;
|
|
use std::collections::{BTreeMap, VecDeque};
|
|
use std::fmt::Write as _;
|
|
use std::mem;
|
|
use std::ops::Range;
|
|
use std::path::PathBuf;
|
|
use std::str;
|
|
use std::string::String;
|
|
use std::sync::Arc;
|
|
use std::sync::{Mutex, MutexGuard};
|
|
use std::vec::Vec;
|
|
use tracing::warn;
|
|
use tracing::{error, info, trace};
|
|
use uuid::Uuid;
|
|
|
|
/// Expected schema version. See `guide/schema.md` for more information.
|
|
pub const EXPECTED_VERSION: i32 = 7;
|
|
|
|
/// Length of the video index cache.
|
|
/// The actual data structure is one bigger than this because we insert before we remove.
|
|
/// Make it one less than a power of two so that the data structure's size is efficient.
|
|
const VIDEO_INDEX_CACHE_LEN: usize = 1023;
|
|
|
|
const GET_RECORDING_PLAYBACK_SQL: &str = r#"
|
|
select
|
|
video_index
|
|
from
|
|
recording_playback
|
|
where
|
|
composite_id = :composite_id
|
|
"#;
|
|
|
|
const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &str = r#"
|
|
insert into video_sample_entry (width, height, pasp_h_spacing, pasp_v_spacing,
|
|
rfc6381_codec, data)
|
|
values (:width, :height, :pasp_h_spacing, :pasp_v_spacing,
|
|
:rfc6381_codec, :data)
|
|
"#;
|
|
|
|
const UPDATE_STREAM_COUNTERS_SQL: &str = r#"
|
|
update stream
|
|
set cum_recordings = :cum_recordings,
|
|
cum_media_duration_90k = :cum_media_duration_90k,
|
|
cum_runs = :cum_runs
|
|
where id = :stream_id
|
|
"#;
|
|
|
|
/// The size of a filesystem block, to use in disk space accounting.
|
|
/// This should really be obtained by a stat call on the sample file directory in question,
|
|
/// but that requires some refactoring. See
|
|
/// [#89](https://github.com/scottlamb/moonfire-nvr/issues/89). We might be able to get away with
|
|
/// this hardcoded value for a while.
|
|
const ASSUMED_BLOCK_SIZE_BYTES: i64 = 4096;
|
|
|
|
/// Rounds a file size up to the next multiple of the block size.
|
|
/// This is useful in representing the actual amount of filesystem space used.
|
|
pub(crate) fn round_up(bytes: i64) -> i64 {
|
|
let blk = ASSUMED_BLOCK_SIZE_BYTES;
|
|
(bytes + blk - 1) / blk * blk
|
|
}
|
|
|
|
/// A wrapper around `Uuid` which implements `FromSql` and `ToSql`.
|
|
pub struct SqlUuid(pub Uuid);
|
|
|
|
impl rusqlite::types::FromSql for SqlUuid {
|
|
fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> {
|
|
let uuid = Uuid::from_slice(value.as_blob()?)
|
|
.map_err(|e| rusqlite::types::FromSqlError::Other(Box::new(e)))?;
|
|
Ok(SqlUuid(uuid))
|
|
}
|
|
}
|
|
|
|
impl rusqlite::types::ToSql for SqlUuid {
|
|
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
|
Ok(self.0.as_bytes()[..].into())
|
|
}
|
|
}
|
|
|
|
struct VideoIndex(Box<[u8]>);
|
|
|
|
impl rusqlite::types::FromSql for VideoIndex {
|
|
fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> {
|
|
Ok(VideoIndex(value.as_blob()?.to_vec().into_boxed_slice()))
|
|
}
|
|
}
|
|
|
|
/// A concrete box derived from a ISO/IEC 14496-12 section 8.5.2 VisualSampleEntry box. Describes
|
|
/// the codec, width, height, etc.
|
|
#[derive(Debug)]
|
|
pub struct VideoSampleEntry {
|
|
pub id: i32,
|
|
|
|
// Fields matching VideoSampleEntryToInsert below.
|
|
pub data: Vec<u8>,
|
|
pub rfc6381_codec: String,
|
|
pub width: u16,
|
|
pub height: u16,
|
|
pub pasp_h_spacing: u16,
|
|
pub pasp_v_spacing: u16,
|
|
}
|
|
|
|
impl VideoSampleEntry {
|
|
/// Returns the aspect ratio as a minimized ratio.
|
|
pub fn aspect(&self) -> num_rational::Ratio<u32> {
|
|
num_rational::Ratio::new(
|
|
u32::from(self.width) * u32::from(self.pasp_h_spacing),
|
|
u32::from(self.height) * u32::from(self.pasp_v_spacing),
|
|
)
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, PartialEq, Eq)]
|
|
pub struct VideoSampleEntryToInsert {
|
|
pub data: Vec<u8>,
|
|
pub rfc6381_codec: String,
|
|
pub width: u16,
|
|
pub height: u16,
|
|
pub pasp_h_spacing: u16,
|
|
pub pasp_v_spacing: u16,
|
|
}
|
|
|
|
impl std::fmt::Debug for VideoSampleEntryToInsert {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
use pretty_hex::PrettyHex;
|
|
f.debug_struct("VideoSampleEntryToInsert")
|
|
.field("data", &self.data.hex_dump())
|
|
.field("rfc6381_codec", &self.rfc6381_codec)
|
|
.field("width", &self.width)
|
|
.field("height", &self.height)
|
|
.field("pasp_h_spacing", &self.pasp_h_spacing)
|
|
.field("pasp_v_spacing", &self.pasp_v_spacing)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
/// A row used in `list_recordings_by_time` and `list_recordings_by_id`.
|
|
#[derive(Copy, Clone, Debug)]
|
|
pub struct ListRecordingsRow {
|
|
pub start: recording::Time,
|
|
pub video_sample_entry_id: i32,
|
|
|
|
pub id: CompositeId,
|
|
|
|
/// This is a recording::Duration, but a single recording's duration fits into an i32.
|
|
pub wall_duration_90k: i32,
|
|
pub media_duration_90k: i32,
|
|
pub video_samples: i32,
|
|
pub video_sync_samples: i32,
|
|
pub sample_file_bytes: i32,
|
|
pub run_offset: i32,
|
|
pub open_id: u32,
|
|
pub flags: i32,
|
|
|
|
/// This is populated by `list_recordings_by_id` but not `list_recordings_by_time`.
|
|
/// (It's not included in the `recording_cover` index, so adding it to
|
|
/// `list_recordings_by_time` would be inefficient.)
|
|
pub prev_media_duration_and_runs: Option<(recording::Duration, i32)>,
|
|
}
|
|
|
|
/// A row used in `list_aggregated_recordings`.
|
|
#[derive(Clone, Debug)]
|
|
pub struct ListAggregatedRecordingsRow {
|
|
pub time: Range<recording::Time>,
|
|
pub ids: Range<i32>,
|
|
pub video_samples: i64,
|
|
pub video_sync_samples: i64,
|
|
pub sample_file_bytes: i64,
|
|
pub video_sample_entry_id: i32,
|
|
pub stream_id: i32,
|
|
pub run_start_id: i32,
|
|
pub open_id: u32,
|
|
pub first_uncommitted: Option<i32>,
|
|
pub growing: bool,
|
|
pub has_trailing_zero: bool,
|
|
}
|
|
|
|
impl ListAggregatedRecordingsRow {
|
|
fn from(row: ListRecordingsRow) -> Self {
|
|
let recording_id = row.id.recording();
|
|
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
|
|
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
|
|
ListAggregatedRecordingsRow {
|
|
time: row.start..recording::Time(row.start.0 + row.wall_duration_90k as i64),
|
|
ids: recording_id..recording_id + 1,
|
|
video_samples: row.video_samples as i64,
|
|
video_sync_samples: row.video_sync_samples as i64,
|
|
sample_file_bytes: row.sample_file_bytes as i64,
|
|
video_sample_entry_id: row.video_sample_entry_id,
|
|
stream_id: row.id.stream(),
|
|
run_start_id: recording_id - row.run_offset,
|
|
open_id: row.open_id,
|
|
first_uncommitted: if uncommitted {
|
|
Some(recording_id)
|
|
} else {
|
|
None
|
|
},
|
|
growing,
|
|
has_trailing_zero: (row.flags & RecordingFlags::TrailingZero as i32) != 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`.
|
|
#[derive(Debug)]
|
|
pub struct RecordingPlayback<'a> {
|
|
pub video_index: &'a [u8],
|
|
}
|
|
|
|
/// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`.
|
|
#[repr(u32)]
|
|
pub enum RecordingFlags {
|
|
TrailingZero = 1,
|
|
|
|
// These values (starting from high bit on down) are never written to the database.
|
|
Growing = 1 << 30,
|
|
Uncommitted = 1 << 31,
|
|
}
|
|
|
|
/// A recording to pass to `LockedDatabase::add_recording` and `raw::insert_recording`.
|
|
#[derive(Clone, Debug, Default)]
|
|
pub struct RecordingToInsert {
|
|
pub run_offset: i32,
|
|
pub flags: i32,
|
|
pub sample_file_bytes: i32,
|
|
pub start: recording::Time,
|
|
|
|
/// Filled in by `add_recording`.
|
|
pub prev_media_duration: recording::Duration,
|
|
|
|
/// Filled in by `add_recording`.
|
|
pub prev_runs: i32,
|
|
|
|
pub wall_duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32.
|
|
pub media_duration_90k: i32,
|
|
pub local_time_delta: recording::Duration,
|
|
pub video_samples: i32,
|
|
pub video_sync_samples: i32,
|
|
pub video_sample_entry_id: i32,
|
|
pub video_index: Vec<u8>,
|
|
pub sample_file_blake3: Option<[u8; 32]>,
|
|
pub end_reason: Option<String>,
|
|
}
|
|
|
|
impl RecordingToInsert {
|
|
fn to_list_row(&self, id: CompositeId, open_id: u32) -> ListRecordingsRow {
|
|
ListRecordingsRow {
|
|
start: self.start,
|
|
video_sample_entry_id: self.video_sample_entry_id,
|
|
id,
|
|
wall_duration_90k: self.wall_duration_90k,
|
|
media_duration_90k: self.media_duration_90k,
|
|
video_samples: self.video_samples,
|
|
video_sync_samples: self.video_sync_samples,
|
|
sample_file_bytes: self.sample_file_bytes,
|
|
run_offset: self.run_offset,
|
|
open_id,
|
|
flags: self.flags | RecordingFlags::Uncommitted as i32,
|
|
prev_media_duration_and_runs: Some((self.prev_media_duration, self.prev_runs)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A row used in `raw::list_oldest_recordings` and `db::delete_oldest_recordings`.
|
|
#[derive(Copy, Clone, Debug)]
|
|
pub(crate) struct ListOldestRecordingsRow {
|
|
pub id: CompositeId,
|
|
pub start: recording::Time,
|
|
pub wall_duration_90k: i32,
|
|
pub sample_file_bytes: i32,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct SampleFileDir {
|
|
pub id: i32,
|
|
pub path: PathBuf,
|
|
pub uuid: Uuid,
|
|
dir: Option<Arc<dir::SampleFileDir>>,
|
|
last_complete_open: Option<Open>,
|
|
|
|
/// ids which are in the `garbage` database table (rather than `recording`) as of last commit
|
|
/// but may still exist on disk. These can't be safely removed from the database yet.
|
|
pub(crate) garbage_needs_unlink: FnvHashSet<CompositeId>,
|
|
|
|
/// ids which are in the `garbage` database table and are guaranteed to no longer exist on
|
|
/// disk (have been unlinked and the dir has been synced). These may be removed from the
|
|
/// database on next flush. Mutually exclusive with `garbage_needs_unlink`.
|
|
pub(crate) garbage_unlinked: Vec<CompositeId>,
|
|
}
|
|
|
|
impl SampleFileDir {
|
|
/// Returns a cloned copy of the directory, or Err if closed.
|
|
///
|
|
/// Use `LockedDatabase::open_sample_file_dirs` prior to calling this method.
|
|
pub fn get(&self) -> Result<Arc<dir::SampleFileDir>, base::Error> {
|
|
Ok(self
|
|
.dir
|
|
.as_ref()
|
|
.ok_or_else(|| {
|
|
err!(
|
|
FailedPrecondition,
|
|
msg("sample file dir {} is closed", self.id)
|
|
)
|
|
})?
|
|
.clone())
|
|
}
|
|
|
|
/// Returns expected existing metadata when opening this directory.
|
|
fn expected_meta(&self, db_uuid: &Uuid) -> schema::DirMeta {
|
|
let mut meta = schema::DirMeta::default();
|
|
meta.db_uuid.extend_from_slice(&db_uuid.as_bytes()[..]);
|
|
meta.dir_uuid.extend_from_slice(&self.uuid.as_bytes()[..]);
|
|
if let Some(o) = self.last_complete_open {
|
|
let open = meta.last_complete_open.mut_or_insert_default();
|
|
open.id = o.id;
|
|
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
|
|
}
|
|
meta
|
|
}
|
|
}
|
|
|
|
pub use crate::auth::RawSessionId;
|
|
pub use crate::auth::Request;
|
|
pub use crate::auth::Session;
|
|
pub use crate::auth::User;
|
|
pub use crate::auth::UserChange;
|
|
|
|
/// In-memory state about a camera.
|
|
#[derive(Debug)]
|
|
pub struct Camera {
|
|
pub id: i32,
|
|
pub uuid: Uuid,
|
|
pub short_name: String,
|
|
pub config: crate::json::CameraConfig,
|
|
pub streams: [Option<i32>; NUM_STREAM_TYPES],
|
|
}
|
|
|
|
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
|
pub enum StreamType {
|
|
Main,
|
|
Sub,
|
|
Ext,
|
|
}
|
|
|
|
pub const NUM_STREAM_TYPES: usize = 3;
|
|
|
|
impl StreamType {
|
|
pub fn from_index(i: usize) -> Option<Self> {
|
|
match i {
|
|
0 => Some(StreamType::Main),
|
|
1 => Some(StreamType::Sub),
|
|
2 => Some(StreamType::Ext),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
pub fn index(self) -> usize {
|
|
match self {
|
|
StreamType::Main => 0,
|
|
StreamType::Sub => 1,
|
|
StreamType::Ext => 2,
|
|
}
|
|
}
|
|
|
|
pub fn as_str(self) -> &'static str {
|
|
match self {
|
|
StreamType::Main => "main",
|
|
StreamType::Sub => "sub",
|
|
StreamType::Ext => "ext",
|
|
}
|
|
}
|
|
|
|
pub fn parse(type_: &str) -> Option<Self> {
|
|
match type_ {
|
|
"main" => Some(StreamType::Main),
|
|
"sub" => Some(StreamType::Sub),
|
|
"ext" => Some(StreamType::Ext),
|
|
_ => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ::std::fmt::Display for StreamType {
|
|
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
|
|
f.write_str(self.as_str())
|
|
}
|
|
}
|
|
|
|
pub const ALL_STREAM_TYPES: [StreamType; NUM_STREAM_TYPES] =
|
|
[StreamType::Main, StreamType::Sub, StreamType::Ext];
|
|
|
|
pub struct Stream {
|
|
pub id: i32,
|
|
pub camera_id: i32,
|
|
pub sample_file_dir_id: Option<i32>,
|
|
pub type_: StreamType,
|
|
pub config: crate::json::StreamConfig,
|
|
|
|
/// The time range of recorded data associated with this stream (minimum start time and maximum
|
|
/// end time). `None` iff there are no recordings for this camera.
|
|
pub range: Option<Range<recording::Time>>,
|
|
|
|
/// The total bytes of flushed sample files. This doesn't include disk space wasted in the
|
|
/// last filesystem block allocated to each file ("internal fragmentation").
|
|
pub sample_file_bytes: i64,
|
|
|
|
/// The total bytes on the filesystem used by this stream. This slightly more than
|
|
/// `sample_file_bytes` because it includes the wasted space in the last filesystem block.
|
|
pub fs_bytes: i64,
|
|
|
|
/// On flush, delete the following recordings (move them to the `garbage` table, to be
|
|
/// collected later). Note they must be the oldest recordings. The later collection involves
|
|
/// the syncer unlinking the files on disk and syncing the directory then enqueueing for
|
|
/// another following flush removal from the `garbage` table.
|
|
to_delete: Vec<ListOldestRecordingsRow>,
|
|
|
|
/// The total bytes to delete with the next flush.
|
|
pub bytes_to_delete: i64,
|
|
pub fs_bytes_to_delete: i64,
|
|
|
|
/// The total bytes to add with the next flush. (`mark_synced` has already been called on these
|
|
/// recordings.)
|
|
pub bytes_to_add: i64,
|
|
pub fs_bytes_to_add: i64,
|
|
|
|
/// The total duration of undeleted recorded data. This may not be `range.end - range.start`
|
|
/// due to gaps and overlap.
|
|
pub duration: recording::Duration,
|
|
|
|
/// Mapping of calendar day (in the server's time zone) to a summary of committed recordings on
|
|
/// that day.
|
|
pub committed_days: days::Map<days::StreamValue>,
|
|
|
|
/// The `cum_recordings` currently committed to the database.
|
|
pub(crate) cum_recordings: i32,
|
|
|
|
/// The `cum_media_duration_90k` currently committed to the database.
|
|
cum_media_duration: recording::Duration,
|
|
|
|
/// The `cum_runs` currently committed to the database.
|
|
cum_runs: i32,
|
|
|
|
/// The recordings which have been added via `LockedDatabase::add_recording` but have yet to
|
|
/// committed to the database.
|
|
///
|
|
/// `uncommitted[i]` uses sample filename `CompositeId::new(id, cum_recordings + i)`;
|
|
/// `cum_recordings` should be advanced when one is committed to maintain this invariant.
|
|
///
|
|
/// TODO: alter the serving path to show these just as if they were already committed.
|
|
uncommitted: VecDeque<Arc<Mutex<RecordingToInsert>>>,
|
|
|
|
/// The number of recordings in `uncommitted` which are synced and ready to commit.
|
|
synced_recordings: usize,
|
|
|
|
on_live_segment: Vec<Box<dyn FnMut(LiveSegment) -> bool + Send>>,
|
|
}
|
|
|
|
/// Bounds of a live view segment. Currently this is a single frame of video.
|
|
/// This is used for live stream recordings. The stream id should already be known to the
|
|
/// subscriber. Note this doesn't actually contain the video, just a reference that can be
|
|
/// looked up within the database.
|
|
#[derive(Clone, Debug)]
|
|
pub struct LiveSegment {
|
|
pub recording: i32,
|
|
|
|
/// If the segment's one frame is a key frame.
|
|
pub is_key: bool,
|
|
|
|
/// The pts, relative to the start of the recording, of the start and end of this live segment,
|
|
/// in 90kHz units.
|
|
pub media_off_90k: Range<i32>,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Default)]
|
|
pub struct StreamChange {
|
|
pub sample_file_dir_id: Option<i32>,
|
|
pub config: crate::json::StreamConfig,
|
|
}
|
|
|
|
/// Information about a camera, used by `add_camera` and `update_camera`.
|
|
#[derive(Clone, Debug, Default)]
|
|
pub struct CameraChange {
|
|
pub short_name: String,
|
|
pub config: crate::json::CameraConfig,
|
|
|
|
/// `StreamType t` is represented by `streams[t.index()]`. A default StreamChange will
|
|
/// correspond to no stream in the database, provided there are no existing recordings for that
|
|
/// stream.
|
|
pub streams: [StreamChange; NUM_STREAM_TYPES],
|
|
}
|
|
|
|
impl Stream {
|
|
/// Adds a single fully committed recording with the given properties to the in-memory state.
|
|
fn add_recording(&mut self, r: Range<recording::Time>, sample_file_bytes: i32) {
|
|
self.range = Some(match self.range {
|
|
Some(ref e) => cmp::min(e.start, r.start)..cmp::max(e.end, r.end),
|
|
None => r.start..r.end,
|
|
});
|
|
self.duration += r.end - r.start;
|
|
self.sample_file_bytes += i64::from(sample_file_bytes);
|
|
self.fs_bytes += round_up(i64::from(sample_file_bytes));
|
|
self.committed_days.adjust(r, 1);
|
|
}
|
|
|
|
/// Returns a days map including unflushed recordings.
|
|
pub fn days(&self) -> days::Map<days::StreamValue> {
|
|
let mut days = self.committed_days.clone();
|
|
for u in &self.uncommitted {
|
|
let l = u.lock().unwrap();
|
|
days.adjust(
|
|
l.start..l.start + recording::Duration(i64::from(l.wall_duration_90k)),
|
|
1,
|
|
);
|
|
}
|
|
days
|
|
}
|
|
}
|
|
|
|
/// Initializes the recordings associated with the given camera.
|
|
fn init_recordings(
|
|
conn: &mut rusqlite::Connection,
|
|
stream_id: i32,
|
|
camera: &Camera,
|
|
stream: &mut Stream,
|
|
) -> Result<(), Error> {
|
|
info!(
|
|
"Loading recordings for camera {} stream {:?}",
|
|
camera.short_name, stream.type_
|
|
);
|
|
let mut stmt = conn.prepare(
|
|
r#"
|
|
select
|
|
recording.start_time_90k,
|
|
recording.wall_duration_90k,
|
|
recording.sample_file_bytes
|
|
from
|
|
recording
|
|
where
|
|
stream_id = :stream_id
|
|
"#,
|
|
)?;
|
|
let mut rows = stmt.query(named_params! {":stream_id": stream_id})?;
|
|
let mut i = 0;
|
|
while let Some(row) = rows.next()? {
|
|
let start = recording::Time(row.get(0)?);
|
|
let duration = recording::Duration(row.get(1)?);
|
|
let bytes = row.get(2)?;
|
|
stream.add_recording(start..start + duration, bytes);
|
|
i += 1;
|
|
}
|
|
info!(
|
|
"Loaded {} recordings for camera {} stream {:?}",
|
|
i, camera.short_name, stream.type_
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
pub struct LockedDatabase {
|
|
conn: rusqlite::Connection,
|
|
uuid: Uuid,
|
|
flush_count: usize,
|
|
|
|
/// If the database is open in read-write mode, the information about the current Open row.
|
|
pub open: Option<Open>,
|
|
|
|
/// The monotonic time when the database was opened (whether in read-write mode or read-only
|
|
/// mode).
|
|
open_monotonic: recording::Time,
|
|
|
|
auth: auth::State,
|
|
signal: signal::State,
|
|
|
|
sample_file_dirs_by_id: BTreeMap<i32, SampleFileDir>,
|
|
cameras_by_id: BTreeMap<i32, Camera>,
|
|
streams_by_id: BTreeMap<i32, Stream>,
|
|
cameras_by_uuid: BTreeMap<Uuid, i32>, // values are ids.
|
|
video_sample_entries_by_id: BTreeMap<i32, Arc<VideoSampleEntry>>,
|
|
video_index_cache: RefCell<LinkedHashMap<i64, Box<[u8]>, fnv::FnvBuildHasher>>,
|
|
on_flush: Vec<Box<dyn Fn() + Send>>,
|
|
}
|
|
|
|
/// Represents a row of the `open` database table.
|
|
#[derive(Copy, Clone, Debug)]
|
|
pub struct Open {
|
|
pub id: u32,
|
|
pub(crate) uuid: Uuid,
|
|
}
|
|
|
|
/// A combination of a stream id and recording id into a single 64-bit int.
|
|
/// This is used as a primary key in the SQLite `recording` table (see `schema.sql`)
|
|
/// and the sample file's name on disk (see `dir.rs`).
|
|
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
|
|
pub struct CompositeId(pub i64);
|
|
|
|
impl CompositeId {
|
|
pub fn new(stream_id: i32, recording_id: i32) -> Self {
|
|
CompositeId((stream_id as i64) << 32 | recording_id as i64)
|
|
}
|
|
|
|
pub fn stream(self) -> i32 {
|
|
(self.0 >> 32) as i32
|
|
}
|
|
pub fn recording(self) -> i32 {
|
|
self.0 as i32
|
|
}
|
|
}
|
|
|
|
impl ::std::fmt::Display for CompositeId {
|
|
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
|
|
write!(f, "{}/{}", self.stream(), self.recording())
|
|
}
|
|
}
|
|
|
|
/// Inserts, updates, or removes streams in the `State` object to match a set of `StreamChange`
|
|
/// structs.
|
|
struct StreamStateChanger {
|
|
sids: [Option<i32>; NUM_STREAM_TYPES],
|
|
|
|
/// For each stream to change, a (stream_id, upsert or `None` to delete) tuple.
|
|
streams: Vec<(i32, Option<StreamStateChangerUpsert>)>,
|
|
}
|
|
|
|
/// Upsert state used internally within [`StreamStateChanger`].
|
|
struct StreamStateChangerUpsert {
|
|
camera_id: i32,
|
|
type_: StreamType,
|
|
sc: StreamChange,
|
|
}
|
|
|
|
impl StreamStateChanger {
|
|
/// Performs the database updates (guarded by the given transaction) and returns the state
|
|
/// change to be applied on successful commit.
|
|
fn new(
|
|
tx: &rusqlite::Transaction,
|
|
camera_id: i32,
|
|
existing: Option<&Camera>,
|
|
streams_by_id: &BTreeMap<i32, Stream>,
|
|
change: &mut CameraChange,
|
|
) -> Result<Self, Error> {
|
|
let mut sids = [None; NUM_STREAM_TYPES];
|
|
let mut streams = Vec::with_capacity(NUM_STREAM_TYPES);
|
|
let existing_streams = existing.map(|e| e.streams).unwrap_or_default();
|
|
for (i, ref mut sc) in change.streams.iter_mut().enumerate() {
|
|
let type_ = StreamType::from_index(i).unwrap();
|
|
let mut have_data = false;
|
|
if let Some(sid) = existing_streams[i] {
|
|
let s = streams_by_id.get(&sid).unwrap();
|
|
if s.range.is_some() {
|
|
have_data = true;
|
|
if let (Some(d), false) = (
|
|
s.sample_file_dir_id,
|
|
s.sample_file_dir_id == sc.sample_file_dir_id,
|
|
) {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"can't change sample_file_dir_id {:?}->{:?} for non-empty stream {}",
|
|
d,
|
|
sc.sample_file_dir_id,
|
|
sid,
|
|
),
|
|
);
|
|
}
|
|
}
|
|
if !have_data && sc.config.is_empty() && sc.sample_file_dir_id.is_none() {
|
|
// Delete stream.
|
|
let mut stmt = tx.prepare_cached(
|
|
r#"
|
|
delete from stream where id = ?
|
|
"#,
|
|
)?;
|
|
if stmt.execute(params![sid])? != 1 {
|
|
bail!(Internal, msg("missing stream {sid}"));
|
|
}
|
|
streams.push((sid, None));
|
|
} else {
|
|
// Update stream.
|
|
let mut stmt = tx.prepare_cached(
|
|
r#"
|
|
update stream set
|
|
config = :config,
|
|
sample_file_dir_id = :sample_file_dir_id
|
|
where
|
|
id = :id
|
|
"#,
|
|
)?;
|
|
let rows = stmt.execute(named_params! {
|
|
":config": &sc.config,
|
|
":sample_file_dir_id": sc.sample_file_dir_id,
|
|
":id": sid,
|
|
})?;
|
|
if rows != 1 {
|
|
bail!(Internal, msg("missing stream {sid}"));
|
|
}
|
|
sids[i] = Some(sid);
|
|
streams.push((
|
|
sid,
|
|
Some(StreamStateChangerUpsert {
|
|
camera_id,
|
|
type_,
|
|
sc: mem::take(*sc),
|
|
}),
|
|
));
|
|
}
|
|
} else {
|
|
if sc.config.is_empty() && sc.sample_file_dir_id.is_none() {
|
|
// Do nothing; there is no record and we want to keep it that way.
|
|
continue;
|
|
}
|
|
// Insert stream.
|
|
let mut stmt = tx.prepare_cached(
|
|
r#"
|
|
insert into stream (camera_id, sample_file_dir_id, type, config,
|
|
cum_recordings, cum_media_duration_90k, cum_runs)
|
|
values (:camera_id, :sample_file_dir_id, :type, :config,
|
|
0, 0, 0)
|
|
"#,
|
|
)?;
|
|
stmt.execute(named_params! {
|
|
":camera_id": camera_id,
|
|
":sample_file_dir_id": sc.sample_file_dir_id,
|
|
":type": type_.as_str(),
|
|
":config": &sc.config,
|
|
})?;
|
|
let id = tx.last_insert_rowid() as i32;
|
|
sids[i] = Some(id);
|
|
streams.push((
|
|
id,
|
|
Some(StreamStateChangerUpsert {
|
|
camera_id,
|
|
type_,
|
|
sc: mem::take(*sc),
|
|
}),
|
|
));
|
|
}
|
|
}
|
|
Ok(StreamStateChanger { sids, streams })
|
|
}
|
|
|
|
/// Applies the change to the given `streams_by_id`. The caller is expected to set
|
|
/// `Camera::streams` to the return value.
|
|
fn apply(
|
|
mut self,
|
|
streams_by_id: &mut BTreeMap<i32, Stream>,
|
|
) -> [Option<i32>; NUM_STREAM_TYPES] {
|
|
for (id, stream) in self.streams.drain(..) {
|
|
use ::std::collections::btree_map::Entry;
|
|
match (streams_by_id.entry(id), stream) {
|
|
(
|
|
Entry::Vacant(e),
|
|
Some(StreamStateChangerUpsert {
|
|
camera_id,
|
|
type_,
|
|
sc,
|
|
}),
|
|
) => {
|
|
e.insert(Stream {
|
|
id,
|
|
type_,
|
|
camera_id,
|
|
sample_file_dir_id: sc.sample_file_dir_id,
|
|
config: sc.config,
|
|
range: None,
|
|
sample_file_bytes: 0,
|
|
fs_bytes: 0,
|
|
to_delete: Vec::new(),
|
|
bytes_to_delete: 0,
|
|
fs_bytes_to_delete: 0,
|
|
bytes_to_add: 0,
|
|
fs_bytes_to_add: 0,
|
|
duration: recording::Duration(0),
|
|
committed_days: days::Map::default(),
|
|
cum_recordings: 0,
|
|
cum_media_duration: recording::Duration(0),
|
|
cum_runs: 0,
|
|
uncommitted: VecDeque::new(),
|
|
synced_recordings: 0,
|
|
on_live_segment: Vec::new(),
|
|
});
|
|
}
|
|
(Entry::Vacant(_), None) => {}
|
|
(Entry::Occupied(e), Some(StreamStateChangerUpsert { sc, .. })) => {
|
|
let e = e.into_mut();
|
|
e.sample_file_dir_id = sc.sample_file_dir_id;
|
|
e.config = sc.config;
|
|
}
|
|
(Entry::Occupied(e), None) => {
|
|
e.remove();
|
|
}
|
|
};
|
|
}
|
|
self.sids
|
|
}
|
|
}
|
|
|
|
/// A retention change as expected by `LockedDatabase::update_retention`.
|
|
pub struct RetentionChange {
|
|
pub stream_id: i32,
|
|
pub new_record: bool,
|
|
pub new_limit: i64,
|
|
}
|
|
|
|
impl LockedDatabase {
|
|
/// Returns an immutable view of the cameras by id.
|
|
pub fn cameras_by_id(&self) -> &BTreeMap<i32, Camera> {
|
|
&self.cameras_by_id
|
|
}
|
|
pub fn sample_file_dirs_by_id(&self) -> &BTreeMap<i32, SampleFileDir> {
|
|
&self.sample_file_dirs_by_id
|
|
}
|
|
|
|
/// Returns the number of completed database flushes since startup.
|
|
pub fn flushes(&self) -> usize {
|
|
self.flush_count
|
|
}
|
|
|
|
/// Adds a placeholder for an uncommitted recording.
|
|
///
|
|
/// The caller should write samples and fill the returned `RecordingToInsert` as it goes
|
|
/// (noting that while holding the lock, it should not perform I/O or acquire the database
|
|
/// lock). Then it should sync to permanent storage and call `mark_synced`. The data will
|
|
/// be written to the database on the next `flush`.
|
|
///
|
|
/// A call to `add_recording` is also a promise that previous recordings (even if not yet
|
|
/// synced and committed) won't change.
|
|
///
|
|
/// This fills the `prev_media_duration` and `prev_runs` fields.
|
|
pub(crate) fn add_recording(
|
|
&mut self,
|
|
stream_id: i32,
|
|
mut r: RecordingToInsert,
|
|
) -> Result<(CompositeId, Arc<Mutex<RecordingToInsert>>), Error> {
|
|
let stream = match self.streams_by_id.get_mut(&stream_id) {
|
|
None => bail!(FailedPrecondition, msg("no such stream {stream_id}")),
|
|
Some(s) => s,
|
|
};
|
|
let id = CompositeId::new(
|
|
stream_id,
|
|
stream.cum_recordings + (stream.uncommitted.len() as i32),
|
|
);
|
|
match stream.uncommitted.back() {
|
|
Some(s) => {
|
|
let l = s.lock().unwrap();
|
|
r.prev_media_duration =
|
|
l.prev_media_duration + recording::Duration(l.media_duration_90k.into());
|
|
r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 };
|
|
}
|
|
None => {
|
|
r.prev_media_duration = stream.cum_media_duration;
|
|
r.prev_runs = stream.cum_runs;
|
|
}
|
|
};
|
|
let recording = Arc::new(Mutex::new(r));
|
|
stream.uncommitted.push_back(Arc::clone(&recording));
|
|
Ok((id, recording))
|
|
}
|
|
|
|
/// Marks the given uncomitted recording as synced and ready to flush.
|
|
/// This must be the next unsynced recording.
|
|
pub(crate) fn mark_synced(&mut self, id: CompositeId) -> Result<(), Error> {
|
|
let stream = match self.streams_by_id.get_mut(&id.stream()) {
|
|
None => bail!(FailedPrecondition, msg("no stream for recording {id}")),
|
|
Some(s) => s,
|
|
};
|
|
let next_unsynced = stream.cum_recordings + (stream.synced_recordings as i32);
|
|
if id.recording() != next_unsynced {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"can't sync {} when next unsynced recording is {} (next unflushed is {})",
|
|
id,
|
|
next_unsynced,
|
|
stream.cum_recordings,
|
|
),
|
|
);
|
|
}
|
|
if stream.synced_recordings == stream.uncommitted.len() {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg("can't sync un-added recording {id}")
|
|
);
|
|
}
|
|
let l = stream.uncommitted[stream.synced_recordings].lock().unwrap();
|
|
let bytes = i64::from(l.sample_file_bytes);
|
|
stream.bytes_to_add += bytes;
|
|
stream.fs_bytes_to_add += round_up(bytes);
|
|
stream.synced_recordings += 1;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) fn delete_garbage(
|
|
&mut self,
|
|
dir_id: i32,
|
|
ids: &mut Vec<CompositeId>,
|
|
) -> Result<(), Error> {
|
|
let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) {
|
|
None => bail!(FailedPrecondition, msg("no such dir {dir_id}")),
|
|
Some(d) => d,
|
|
};
|
|
dir.garbage_unlinked.reserve(ids.len());
|
|
ids.retain(|id| {
|
|
if !dir.garbage_needs_unlink.remove(id) {
|
|
return true;
|
|
}
|
|
dir.garbage_unlinked.push(*id);
|
|
false
|
|
});
|
|
if !ids.is_empty() {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg("delete_garbage with non-garbage ids {:?}", &ids[..])
|
|
);
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Registers a callback to run on every live segment immediately after it's recorded.
|
|
/// The callback is run with the database lock held, so it must not call back into the database
|
|
/// or block. The callback should return false to unregister.
|
|
pub fn watch_live(
|
|
&mut self,
|
|
stream_id: i32,
|
|
cb: Box<dyn FnMut(LiveSegment) -> bool + Send>,
|
|
) -> Result<(), Error> {
|
|
let s = match self.streams_by_id.get_mut(&stream_id) {
|
|
None => bail!(NotFound, msg("no such stream {stream_id}")),
|
|
Some(s) => s,
|
|
};
|
|
s.on_live_segment.push(cb);
|
|
Ok(())
|
|
}
|
|
|
|
/// Clears all watches on all streams.
|
|
/// Normally watches are self-cleaning: when a segment is sent, the callback returns false if
|
|
/// it is no longer interested (typically because hyper has just noticed the client is no
|
|
/// longer connected). This doesn't work when the system is shutting down and nothing more is
|
|
/// sent, though.
|
|
pub fn clear_watches(&mut self) {
|
|
for s in self.streams_by_id.values_mut() {
|
|
s.on_live_segment.clear();
|
|
}
|
|
}
|
|
|
|
pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveSegment) -> Result<(), Error> {
|
|
let s = match self.streams_by_id.get_mut(&stream) {
|
|
None => bail!(Internal, msg("no such stream {stream}")),
|
|
Some(s) => s,
|
|
};
|
|
|
|
// TODO: use std's retain_mut after it's available in our minimum supported Rust version.
|
|
// <https://github.com/rust-lang/rust/issues/48919>
|
|
odds::vec::VecExt::retain_mut(&mut s.on_live_segment, |cb| cb(l.clone()));
|
|
Ok(())
|
|
}
|
|
|
|
/// Helper for `DatabaseGuard::flush()` and `Database::drop()`.
|
|
///
|
|
/// The public API is in `DatabaseGuard::flush()`; it supplies the `Clocks` to this function.
|
|
fn flush<C: Clocks>(&mut self, clocks: &C, reason: &str) -> Result<(), Error> {
|
|
let span = tracing::info_span!("flush", flush_count = self.flush_count, reason);
|
|
let _enter = span.enter();
|
|
let o = match self.open.as_ref() {
|
|
None => bail!(Internal, msg("database is read-only")),
|
|
Some(o) => o,
|
|
};
|
|
let tx = self.conn.transaction()?;
|
|
let mut new_ranges =
|
|
FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), Default::default());
|
|
{
|
|
let mut stmt = tx.prepare_cached(UPDATE_STREAM_COUNTERS_SQL)?;
|
|
for (&stream_id, s) in &self.streams_by_id {
|
|
// Process additions.
|
|
let mut new_duration = 0;
|
|
let mut new_runs = 0;
|
|
for i in 0..s.synced_recordings {
|
|
let l = s.uncommitted[i].lock().unwrap();
|
|
raw::insert_recording(
|
|
&tx,
|
|
o,
|
|
CompositeId::new(stream_id, s.cum_recordings + i as i32),
|
|
&l,
|
|
)?;
|
|
new_duration += i64::from(l.wall_duration_90k);
|
|
new_runs += if l.run_offset == 0 { 1 } else { 0 };
|
|
}
|
|
if s.synced_recordings > 0 {
|
|
new_ranges.entry(stream_id).or_insert(None);
|
|
stmt.execute(named_params! {
|
|
":stream_id": stream_id,
|
|
":cum_recordings": s.cum_recordings + s.synced_recordings as i32,
|
|
":cum_media_duration_90k": s.cum_media_duration.0 + new_duration,
|
|
":cum_runs": s.cum_runs + new_runs,
|
|
})?;
|
|
}
|
|
|
|
// Process deletions.
|
|
if let Some(l) = s.to_delete.last() {
|
|
new_ranges.entry(stream_id).or_insert(None);
|
|
let dir = match s.sample_file_dir_id {
|
|
None => bail!(Internal, msg("stream {stream_id} has no directory!")),
|
|
Some(d) => d,
|
|
};
|
|
|
|
// raw::delete_recordings does a bulk transfer of a range from recording to
|
|
// garbage, rather than operating on each element of to_delete. This is
|
|
// guaranteed to give the same result because to_delete is guaranteed to be the
|
|
// oldest recordings for the stream.
|
|
let start = CompositeId::new(stream_id, 0);
|
|
let end = CompositeId(l.id.0 + 1);
|
|
let n = raw::delete_recordings(&tx, dir, start..end)?;
|
|
if n != s.to_delete.len() {
|
|
bail!(
|
|
Internal,
|
|
msg(
|
|
"Found {} rows in {} .. {}, expected {}: {:?}",
|
|
n,
|
|
start,
|
|
end,
|
|
s.to_delete.len(),
|
|
&s.to_delete,
|
|
),
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for dir in self.sample_file_dirs_by_id.values() {
|
|
raw::mark_sample_files_deleted(&tx, &dir.garbage_unlinked)?;
|
|
}
|
|
for (&stream_id, r) in &mut new_ranges {
|
|
*r = raw::get_range(&tx, stream_id)?;
|
|
}
|
|
{
|
|
let mut stmt = tx.prepare_cached(
|
|
r"update open set duration_90k = ?, end_time_90k = ? where id = ?",
|
|
)?;
|
|
let rows = stmt.execute(params![
|
|
(recording::Time::new(clocks.monotonic()) - self.open_monotonic).0,
|
|
recording::Time::new(clocks.realtime()).0,
|
|
o.id,
|
|
])?;
|
|
if rows != 1 {
|
|
bail!(Internal, msg("unable to find current open {}", o.id));
|
|
}
|
|
}
|
|
self.auth.flush(&tx)?;
|
|
self.signal.flush(&tx)?;
|
|
tx.commit()?;
|
|
|
|
#[derive(Default)]
|
|
struct DirLog {
|
|
added: SmallVec<[CompositeId; 32]>,
|
|
deleted: SmallVec<[CompositeId; 32]>,
|
|
gced: SmallVec<[CompositeId; 32]>,
|
|
added_bytes: i64,
|
|
deleted_bytes: i64,
|
|
}
|
|
let mut dir_logs: FnvHashMap<i32, DirLog> = FnvHashMap::default();
|
|
|
|
// Process delete_garbage.
|
|
for (&id, dir) in &mut self.sample_file_dirs_by_id {
|
|
if !dir.garbage_unlinked.is_empty() {
|
|
dir_logs
|
|
.entry(id)
|
|
.or_default()
|
|
.gced
|
|
.extend(dir.garbage_unlinked.drain(..));
|
|
}
|
|
}
|
|
|
|
for (stream_id, new_range) in new_ranges.drain() {
|
|
let s = self.streams_by_id.get_mut(&stream_id).unwrap();
|
|
let dir_id = s.sample_file_dir_id.unwrap();
|
|
let dir = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap();
|
|
let log = dir_logs.entry(dir_id).or_default();
|
|
|
|
// Process delete_oldest_recordings.
|
|
s.sample_file_bytes -= s.bytes_to_delete;
|
|
s.fs_bytes -= s.fs_bytes_to_delete;
|
|
log.deleted_bytes += s.bytes_to_delete;
|
|
s.bytes_to_delete = 0;
|
|
s.fs_bytes_to_delete = 0;
|
|
log.deleted.reserve(s.to_delete.len());
|
|
for row in s.to_delete.drain(..) {
|
|
log.deleted.push(row.id);
|
|
dir.garbage_needs_unlink.insert(row.id);
|
|
let d = recording::Duration(i64::from(row.wall_duration_90k));
|
|
s.duration -= d;
|
|
s.committed_days.adjust(row.start..row.start + d, -1);
|
|
}
|
|
|
|
// Process add_recordings.
|
|
log.added_bytes += s.bytes_to_add;
|
|
s.bytes_to_add = 0;
|
|
s.fs_bytes_to_add = 0;
|
|
log.added.reserve(s.synced_recordings);
|
|
for _ in 0..s.synced_recordings {
|
|
let u = s.uncommitted.pop_front().unwrap();
|
|
log.added
|
|
.push(CompositeId::new(stream_id, s.cum_recordings));
|
|
let l = u.lock().unwrap();
|
|
s.cum_recordings += 1;
|
|
let wall_dur = recording::Duration(l.wall_duration_90k.into());
|
|
let media_dur = recording::Duration(l.media_duration_90k.into());
|
|
s.cum_media_duration += media_dur;
|
|
s.cum_runs += if l.run_offset == 0 { 1 } else { 0 };
|
|
let end = l.start + wall_dur;
|
|
s.add_recording(l.start..end, l.sample_file_bytes);
|
|
}
|
|
s.synced_recordings = 0;
|
|
|
|
// Fix the range.
|
|
s.range = new_range;
|
|
}
|
|
self.auth.post_flush();
|
|
self.signal.post_flush();
|
|
self.flush_count += 1;
|
|
let mut log_msg = String::with_capacity(256);
|
|
for (&dir_id, log) in &dir_logs {
|
|
let dir = self.sample_file_dirs_by_id.get(&dir_id).unwrap();
|
|
write!(
|
|
&mut log_msg,
|
|
"\n{}: added {}B in {} recordings ({}), deleted {}B in {} ({}), \
|
|
GCed {} recordings ({}).",
|
|
dir.path.display(),
|
|
&encode_size(log.added_bytes),
|
|
log.added.len(),
|
|
log.added.iter().join(", "),
|
|
&encode_size(log.deleted_bytes),
|
|
log.deleted.len(),
|
|
log.deleted.iter().join(", "),
|
|
log.gced.len(),
|
|
log.gced.iter().join(", ")
|
|
)
|
|
.unwrap();
|
|
}
|
|
if log_msg.is_empty() {
|
|
log_msg.push_str(" no recording changes");
|
|
}
|
|
info!("flush complete: {log_msg}");
|
|
for cb in &self.on_flush {
|
|
cb();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Sets a watcher which will receive an (empty) event on successful flush.
|
|
/// The lock will be held while this is run, so it should not do any I/O.
|
|
pub(crate) fn on_flush(&mut self, run: Box<dyn Fn() + Send>) {
|
|
self.on_flush.push(run);
|
|
}
|
|
|
|
// TODO: find a cleaner way to do this. Seems weird for src/cmds/run.rs to clear the on flush
|
|
// handlers given that it didn't add them.
|
|
pub fn clear_on_flush(&mut self) {
|
|
self.on_flush.clear();
|
|
}
|
|
|
|
/// Opens the given sample file directories.
|
|
///
|
|
/// `ids` is implicitly de-duplicated.
|
|
///
|
|
/// When the database is in read-only mode, this simply opens all the directories after
|
|
/// locking and verifying their metadata matches the database state. In read-write mode, it
|
|
/// performs a single database transaction to update metadata for all dirs, then performs a like
|
|
/// update to the directories' on-disk metadata.
|
|
///
|
|
/// Note this violates the principle of never accessing disk while holding the database lock.
|
|
/// Currently this only happens at startup (or during configuration), so this isn't a problem
|
|
/// in practice.
|
|
pub fn open_sample_file_dirs(&mut self, ids: &[i32]) -> Result<(), Error> {
|
|
let mut in_progress = FnvHashMap::with_capacity_and_hasher(ids.len(), Default::default());
|
|
for &id in ids {
|
|
let e = in_progress.entry(id);
|
|
use ::std::collections::hash_map::Entry;
|
|
let e = match e {
|
|
Entry::Occupied(_) => continue, // suppress duplicate.
|
|
Entry::Vacant(e) => e,
|
|
};
|
|
let dir = self
|
|
.sample_file_dirs_by_id
|
|
.get_mut(&id)
|
|
.ok_or_else(|| err!(NotFound, msg("no such dir {id}")))?;
|
|
if dir.dir.is_some() {
|
|
continue;
|
|
}
|
|
let mut expected_meta = dir.expected_meta(&self.uuid);
|
|
if let Some(o) = self.open.as_ref() {
|
|
let open = expected_meta.in_progress_open.mut_or_insert_default();
|
|
open.id = o.id;
|
|
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
|
|
}
|
|
let d = dir::SampleFileDir::open(&dir.path, &expected_meta)
|
|
.map_err(|e| err!(e, msg("Failed to open dir {}", dir.path.display())))?;
|
|
if self.open.is_none() {
|
|
// read-only mode; it's already fully opened.
|
|
dir.dir = Some(d);
|
|
} else {
|
|
// read-write mode; there are more steps to do.
|
|
e.insert((expected_meta, d));
|
|
}
|
|
}
|
|
|
|
let o = match self.open.as_ref() {
|
|
None => return Ok(()), // read-only mode; all done.
|
|
Some(o) => o,
|
|
};
|
|
|
|
let tx = self.conn.transaction()?;
|
|
{
|
|
let mut stmt = tx.prepare_cached(
|
|
r#"
|
|
update sample_file_dir set last_complete_open_id = ? where id = ?
|
|
"#,
|
|
)?;
|
|
for &id in in_progress.keys() {
|
|
if stmt.execute(params![o.id, id])? != 1 {
|
|
bail!(Internal, msg("unable to update dir {id}"));
|
|
}
|
|
}
|
|
}
|
|
tx.commit()?;
|
|
|
|
for (id, (mut meta, d)) in in_progress.drain() {
|
|
let dir = self.sample_file_dirs_by_id.get_mut(&id).unwrap();
|
|
meta.last_complete_open = meta.in_progress_open.take().into();
|
|
d.write_meta(&meta)?;
|
|
dir.dir = Some(d);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn streams_by_id(&self) -> &BTreeMap<i32, Stream> {
|
|
&self.streams_by_id
|
|
}
|
|
|
|
/// Returns an immutable view of the video sample entries.
|
|
pub fn video_sample_entries_by_id(&self) -> &BTreeMap<i32, Arc<VideoSampleEntry>> {
|
|
&self.video_sample_entries_by_id
|
|
}
|
|
|
|
/// Gets a given camera by uuid.
|
|
pub fn get_camera(&self, uuid: Uuid) -> Option<&Camera> {
|
|
self.cameras_by_uuid.get(&uuid).map(|id| {
|
|
self.cameras_by_id
|
|
.get(id)
|
|
.expect("uuid->id requires id->cam")
|
|
})
|
|
}
|
|
|
|
/// Lists the specified recordings, passing them to a supplied function. Given that the
|
|
/// function is called with the database lock held, it should be quick.
|
|
///
|
|
/// Note that at present, the returned recordings are _not_ completely ordered by start time.
|
|
/// Uncommitted recordings are returned id order after the others.
|
|
pub fn list_recordings_by_time(
|
|
&self,
|
|
stream_id: i32,
|
|
desired_time: Range<recording::Time>,
|
|
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), base::Error>,
|
|
) -> Result<(), base::Error> {
|
|
let s = match self.streams_by_id.get(&stream_id) {
|
|
None => bail!(NotFound, msg("no such stream {stream_id}")),
|
|
Some(s) => s,
|
|
};
|
|
raw::list_recordings_by_time(&self.conn, stream_id, desired_time.clone(), f)?;
|
|
for (i, u) in s.uncommitted.iter().enumerate() {
|
|
let row = {
|
|
let l = u.lock().unwrap();
|
|
if l.video_samples > 0 {
|
|
let end = l.start + recording::Duration(l.wall_duration_90k as i64);
|
|
if l.start > desired_time.end || end < desired_time.start {
|
|
continue; // there's no overlap with the requested range.
|
|
}
|
|
l.to_list_row(
|
|
CompositeId::new(stream_id, s.cum_recordings + i as i32),
|
|
self.open.unwrap().id,
|
|
)
|
|
} else {
|
|
continue;
|
|
}
|
|
};
|
|
f(row)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Lists the specified recordings in ascending order by id.
|
|
pub fn list_recordings_by_id(
|
|
&self,
|
|
stream_id: i32,
|
|
desired_ids: Range<i32>,
|
|
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), base::Error>,
|
|
) -> Result<(), base::Error> {
|
|
let s = match self.streams_by_id.get(&stream_id) {
|
|
None => bail!(NotFound, msg("no such stream {stream_id}")),
|
|
Some(s) => s,
|
|
};
|
|
if desired_ids.start < s.cum_recordings {
|
|
raw::list_recordings_by_id(&self.conn, stream_id, desired_ids.clone(), f)?;
|
|
}
|
|
if desired_ids.end > s.cum_recordings {
|
|
let start = cmp::max(0, desired_ids.start - s.cum_recordings) as usize;
|
|
let end = cmp::min(
|
|
(desired_ids.end - s.cum_recordings) as usize,
|
|
s.uncommitted.len(),
|
|
);
|
|
for i in start..end {
|
|
let row = {
|
|
let l = s.uncommitted[i].lock().unwrap();
|
|
if l.video_samples > 0 {
|
|
l.to_list_row(
|
|
CompositeId::new(stream_id, s.cum_recordings + i as i32),
|
|
self.open.unwrap().id,
|
|
)
|
|
} else {
|
|
continue;
|
|
}
|
|
};
|
|
f(row)?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Calls `list_recordings_by_time` and aggregates consecutive recordings.
|
|
/// Rows are given to the callback in arbitrary order. Callers which care about ordering
|
|
/// should do their own sorting.
|
|
pub fn list_aggregated_recordings(
|
|
&self,
|
|
stream_id: i32,
|
|
desired_time: Range<recording::Time>,
|
|
forced_split: recording::Duration,
|
|
f: &mut dyn FnMut(&ListAggregatedRecordingsRow) -> Result<(), base::Error>,
|
|
) -> Result<(), base::Error> {
|
|
// Iterate, maintaining a map from a recording_id to the aggregated row for the latest
|
|
// batch of recordings from the run starting at that id. Runs can be split into multiple
|
|
// batches for a few reasons:
|
|
//
|
|
// * forced split (when exceeding a duration limit)
|
|
// * a missing id (one that was deleted out of order)
|
|
// * video_sample_entry mismatch (if the parameters changed during a RTSP session)
|
|
//
|
|
// This iteration works because in a run, the start_time+duration of recording id r
|
|
// is equal to the start_time of recording id r+1. Thus ascending times guarantees
|
|
// ascending ids within a run. (Different runs, however, can be arbitrarily interleaved if
|
|
// their timestamps overlap. Tracking all active runs prevents that interleaving from
|
|
// causing problems.) list_recordings_by_time also returns uncommitted recordings in
|
|
// ascending order by id, and after any committed recordings with lower ids.
|
|
let mut aggs: BTreeMap<i32, ListAggregatedRecordingsRow> = BTreeMap::new();
|
|
self.list_recordings_by_time(stream_id, desired_time, &mut |row| {
|
|
let recording_id = row.id.recording();
|
|
let run_start_id = recording_id - row.run_offset;
|
|
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
|
|
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
|
|
let has_trailing_zero = (row.flags & RecordingFlags::TrailingZero as i32) != 0;
|
|
use std::collections::btree_map::Entry;
|
|
match aggs.entry(run_start_id) {
|
|
Entry::Occupied(mut e) => {
|
|
let a = e.get_mut();
|
|
let new_dur = a.time.end - a.time.start
|
|
+ recording::Duration(row.wall_duration_90k as i64);
|
|
let needs_flush = a.ids.end != recording_id
|
|
|| row.video_sample_entry_id != a.video_sample_entry_id
|
|
|| new_dur >= forced_split;
|
|
if needs_flush {
|
|
// flush then start a new entry.
|
|
f(a)?;
|
|
*a = ListAggregatedRecordingsRow::from(row);
|
|
} else {
|
|
// append.
|
|
if a.time.end != row.start {
|
|
bail!(
|
|
Internal,
|
|
msg(
|
|
"stream {} recording {} ends at {} but {} starts at {}",
|
|
stream_id,
|
|
a.ids.end - 1,
|
|
a.time.end,
|
|
row.id,
|
|
row.start,
|
|
),
|
|
);
|
|
}
|
|
if a.open_id != row.open_id {
|
|
bail!(
|
|
Internal,
|
|
msg(
|
|
"stream {} recording {} has open id {} but {} has {}",
|
|
stream_id,
|
|
a.ids.end - 1,
|
|
a.open_id,
|
|
row.id,
|
|
row.open_id,
|
|
),
|
|
);
|
|
}
|
|
a.time.end.0 += row.wall_duration_90k as i64;
|
|
a.ids.end = recording_id + 1;
|
|
a.video_samples += row.video_samples as i64;
|
|
a.video_sync_samples += row.video_sync_samples as i64;
|
|
a.sample_file_bytes += row.sample_file_bytes as i64;
|
|
if uncommitted {
|
|
a.first_uncommitted = a.first_uncommitted.or(Some(recording_id));
|
|
}
|
|
a.growing = growing;
|
|
a.has_trailing_zero = has_trailing_zero;
|
|
}
|
|
}
|
|
Entry::Vacant(e) => {
|
|
e.insert(ListAggregatedRecordingsRow::from(row));
|
|
}
|
|
}
|
|
Ok(())
|
|
})?;
|
|
for a in aggs.values() {
|
|
f(a)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Calls `f` with a single `recording_playback` row.
|
|
/// Note the lock is held for the duration of `f`.
|
|
/// This uses a LRU cache to reduce the number of retrievals from the database.
|
|
pub fn with_recording_playback<R>(
|
|
&self,
|
|
id: CompositeId,
|
|
f: &mut dyn FnMut(&RecordingPlayback) -> Result<R, Error>,
|
|
) -> Result<R, Error> {
|
|
// Check for uncommitted path.
|
|
let s = self
|
|
.streams_by_id
|
|
.get(&id.stream())
|
|
.ok_or_else(|| err!(Internal, msg("no stream for {}", id)))?;
|
|
if s.cum_recordings <= id.recording() {
|
|
let i = id.recording() - s.cum_recordings;
|
|
if i as usize >= s.uncommitted.len() {
|
|
bail!(
|
|
Internal,
|
|
msg(
|
|
"no such recording {}; latest committed is {}, latest is {}",
|
|
id,
|
|
s.cum_recordings,
|
|
s.cum_recordings + s.uncommitted.len() as i32,
|
|
),
|
|
);
|
|
}
|
|
let l = s.uncommitted[i as usize].lock().unwrap();
|
|
return f(&RecordingPlayback {
|
|
video_index: &l.video_index,
|
|
});
|
|
}
|
|
|
|
// Committed path.
|
|
let mut cache = self.video_index_cache.borrow_mut();
|
|
use hashlink::linked_hash_map::RawEntryMut;
|
|
match cache.raw_entry_mut().from_key(&id.0) {
|
|
RawEntryMut::Occupied(mut occupied) => {
|
|
trace!("cache hit for recording {}", id);
|
|
occupied.to_back();
|
|
let video_index = occupied.get();
|
|
f(&RecordingPlayback { video_index })
|
|
}
|
|
RawEntryMut::Vacant(vacant) => {
|
|
trace!("cache miss for recording {}", id);
|
|
let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?;
|
|
let mut rows = stmt.query(named_params! {":composite_id": id.0})?;
|
|
if let Some(row) = rows.next()? {
|
|
let video_index: VideoIndex = row.get(0)?;
|
|
let result = f(&RecordingPlayback {
|
|
video_index: &video_index.0[..],
|
|
});
|
|
vacant.insert(id.0, video_index.0);
|
|
if cache.len() > VIDEO_INDEX_CACHE_LEN {
|
|
cache.pop_front();
|
|
}
|
|
return result;
|
|
}
|
|
Err(err!(Internal, msg("no such recording {id}")))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Queues for deletion the oldest recordings that aren't already queued.
|
|
/// `f` should return true for each row that should be deleted.
|
|
pub(crate) fn delete_oldest_recordings(
|
|
&mut self,
|
|
stream_id: i32,
|
|
f: &mut dyn FnMut(&ListOldestRecordingsRow) -> bool,
|
|
) -> Result<(), Error> {
|
|
let s = match self.streams_by_id.get_mut(&stream_id) {
|
|
None => bail!(Internal, msg("no stream {stream_id}")),
|
|
Some(s) => s,
|
|
};
|
|
let end = match s.to_delete.last() {
|
|
None => 0,
|
|
Some(row) => row.id.recording() + 1,
|
|
};
|
|
raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| {
|
|
if f(&r) {
|
|
s.to_delete.push(r);
|
|
let bytes = i64::from(r.sample_file_bytes);
|
|
s.bytes_to_delete += bytes;
|
|
s.fs_bytes_to_delete += round_up(bytes);
|
|
return true;
|
|
}
|
|
false
|
|
})
|
|
}
|
|
|
|
/// Initializes the video_sample_entries. To be called during construction.
|
|
fn init_video_sample_entries(&mut self) -> Result<(), Error> {
|
|
info!("Loading video sample entries");
|
|
let mut stmt = self.conn.prepare(
|
|
r#"
|
|
select
|
|
id,
|
|
width,
|
|
height,
|
|
pasp_h_spacing,
|
|
pasp_v_spacing,
|
|
rfc6381_codec,
|
|
data
|
|
from
|
|
video_sample_entry
|
|
"#,
|
|
)?;
|
|
let mut rows = stmt.query(params![])?;
|
|
while let Some(row) = rows.next()? {
|
|
let id = row.get(0)?;
|
|
let data: Vec<u8> = row.get(6)?;
|
|
let get_and_cvt = |i: usize| {
|
|
let raw = row.get::<_, i32>(i)?;
|
|
u16::try_from(raw).map_err(|e| err!(OutOfRange, source(e)))
|
|
};
|
|
self.video_sample_entries_by_id.insert(
|
|
id,
|
|
Arc::new(VideoSampleEntry {
|
|
id,
|
|
width: get_and_cvt(1)?,
|
|
height: get_and_cvt(2)?,
|
|
pasp_h_spacing: get_and_cvt(3)?,
|
|
pasp_v_spacing: get_and_cvt(4)?,
|
|
data,
|
|
rfc6381_codec: row.get(5)?,
|
|
}),
|
|
);
|
|
}
|
|
info!(
|
|
"Loaded {} video sample entries",
|
|
self.video_sample_entries_by_id.len()
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Initializes the sample file dirs.
|
|
/// To be called during construction.
|
|
fn init_sample_file_dirs(&mut self) -> Result<(), Error> {
|
|
info!("Loading sample file dirs");
|
|
let mut stmt = self.conn.prepare(
|
|
r#"
|
|
select
|
|
d.id,
|
|
d.config,
|
|
d.uuid,
|
|
d.last_complete_open_id,
|
|
o.uuid
|
|
from
|
|
sample_file_dir d left join open o on (d.last_complete_open_id = o.id);
|
|
"#,
|
|
)?;
|
|
let mut rows = stmt.query(params![])?;
|
|
while let Some(row) = rows.next()? {
|
|
let id = row.get(0)?;
|
|
let config: SampleFileDirConfig = row.get(1)?;
|
|
let dir_uuid: SqlUuid = row.get(2)?;
|
|
let open_id: Option<u32> = row.get(3)?;
|
|
let open_uuid: Option<SqlUuid> = row.get(4)?;
|
|
let last_complete_open = match (open_id, open_uuid) {
|
|
(Some(id), Some(uuid)) => Some(Open { id, uuid: uuid.0 }),
|
|
(None, None) => None,
|
|
_ => bail!(Internal, msg("open table missing id {id}")),
|
|
};
|
|
self.sample_file_dirs_by_id.insert(
|
|
id,
|
|
SampleFileDir {
|
|
id,
|
|
uuid: dir_uuid.0,
|
|
path: config.path,
|
|
dir: None,
|
|
last_complete_open,
|
|
garbage_needs_unlink: raw::list_garbage(&self.conn, id)?,
|
|
garbage_unlinked: Vec::new(),
|
|
},
|
|
);
|
|
}
|
|
info!(
|
|
"Loaded {} sample file dirs",
|
|
self.sample_file_dirs_by_id.len()
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Initializes the cameras, but not their matching recordings.
|
|
/// To be called during construction.
|
|
fn init_cameras(&mut self) -> Result<(), Error> {
|
|
info!("Loading cameras");
|
|
let mut stmt = self.conn.prepare(
|
|
r#"
|
|
select
|
|
id,
|
|
uuid,
|
|
short_name,
|
|
config
|
|
from
|
|
camera;
|
|
"#,
|
|
)?;
|
|
let mut rows = stmt.query(params![])?;
|
|
while let Some(row) = rows.next()? {
|
|
let id = row.get(0)?;
|
|
let uuid: SqlUuid = row.get(1)?;
|
|
self.cameras_by_id.insert(
|
|
id,
|
|
Camera {
|
|
id,
|
|
uuid: uuid.0,
|
|
short_name: row.get(2)?,
|
|
config: row.get(3)?,
|
|
streams: Default::default(),
|
|
},
|
|
);
|
|
self.cameras_by_uuid.insert(uuid.0, id);
|
|
}
|
|
info!("Loaded {} cameras", self.cameras_by_id.len());
|
|
Ok(())
|
|
}
|
|
|
|
/// Initializes the streams, but not their matching recordings.
|
|
/// To be called during construction.
|
|
fn init_streams(&mut self) -> Result<(), Error> {
|
|
info!("Loading streams");
|
|
let mut stmt = self.conn.prepare(
|
|
r#"
|
|
select
|
|
id,
|
|
type,
|
|
camera_id,
|
|
sample_file_dir_id,
|
|
config,
|
|
cum_recordings,
|
|
cum_media_duration_90k,
|
|
cum_runs
|
|
from
|
|
stream;
|
|
"#,
|
|
)?;
|
|
let mut rows = stmt.query(params![])?;
|
|
while let Some(row) = rows.next()? {
|
|
let id = row.get(0)?;
|
|
let type_: String = row.get(1)?;
|
|
let type_ = StreamType::parse(&type_)
|
|
.ok_or_else(|| err!(DataLoss, msg("no such stream type {type_}")))?;
|
|
let camera_id = row.get(2)?;
|
|
let c = self
|
|
.cameras_by_id
|
|
.get_mut(&camera_id)
|
|
.ok_or_else(|| err!(DataLoss, msg("missing camera {camera_id} for stream {id}")))?;
|
|
self.streams_by_id.insert(
|
|
id,
|
|
Stream {
|
|
id,
|
|
type_,
|
|
camera_id,
|
|
sample_file_dir_id: row.get(3)?,
|
|
config: row.get(4)?,
|
|
range: None,
|
|
sample_file_bytes: 0,
|
|
fs_bytes: 0,
|
|
to_delete: Vec::new(),
|
|
bytes_to_delete: 0,
|
|
fs_bytes_to_delete: 0,
|
|
bytes_to_add: 0,
|
|
fs_bytes_to_add: 0,
|
|
duration: recording::Duration(0),
|
|
committed_days: days::Map::default(),
|
|
cum_recordings: row.get(5)?,
|
|
cum_media_duration: recording::Duration(row.get(6)?),
|
|
cum_runs: row.get(7)?,
|
|
uncommitted: VecDeque::new(),
|
|
synced_recordings: 0,
|
|
on_live_segment: Vec::new(),
|
|
},
|
|
);
|
|
c.streams[type_.index()] = Some(id);
|
|
}
|
|
info!("Loaded {} streams", self.streams_by_id.len());
|
|
Ok(())
|
|
}
|
|
|
|
/// Inserts the specified video sample entry if absent.
|
|
/// On success, returns the id of a new or existing row.
|
|
pub fn insert_video_sample_entry(
|
|
&mut self,
|
|
entry: VideoSampleEntryToInsert,
|
|
) -> Result<i32, Error> {
|
|
// Check if it already exists.
|
|
// There shouldn't be too many entries, so it's fine to enumerate everything.
|
|
for (&id, v) in &self.video_sample_entries_by_id {
|
|
if v.data == entry.data {
|
|
// The other fields are derived from data, so differences indicate a bug.
|
|
if v.width != entry.width
|
|
|| v.height != entry.height
|
|
|| v.pasp_h_spacing != entry.pasp_h_spacing
|
|
|| v.pasp_v_spacing != entry.pasp_v_spacing
|
|
{
|
|
bail!(
|
|
Internal,
|
|
msg("video_sample_entry id {id}: existing entry {v:?}, new {entry:?}"),
|
|
);
|
|
}
|
|
return Ok(id);
|
|
}
|
|
}
|
|
|
|
let mut stmt = self.conn.prepare_cached(INSERT_VIDEO_SAMPLE_ENTRY_SQL)?;
|
|
stmt.execute(named_params! {
|
|
":width": i32::from(entry.width),
|
|
":height": i32::from(entry.height),
|
|
":pasp_h_spacing": i32::from(entry.pasp_h_spacing),
|
|
":pasp_v_spacing": i32::from(entry.pasp_v_spacing),
|
|
":rfc6381_codec": &entry.rfc6381_codec,
|
|
":data": &entry.data,
|
|
})
|
|
.map_err(|e| err!(e, msg("Unable to insert {entry:#?}")))?;
|
|
|
|
let id = self.conn.last_insert_rowid() as i32;
|
|
self.video_sample_entries_by_id.insert(
|
|
id,
|
|
Arc::new(VideoSampleEntry {
|
|
id,
|
|
width: entry.width,
|
|
height: entry.height,
|
|
pasp_h_spacing: entry.pasp_h_spacing,
|
|
pasp_v_spacing: entry.pasp_v_spacing,
|
|
data: entry.data,
|
|
rfc6381_codec: entry.rfc6381_codec,
|
|
}),
|
|
);
|
|
|
|
Ok(id)
|
|
}
|
|
|
|
pub fn add_sample_file_dir(&mut self, path: PathBuf) -> Result<i32, Error> {
|
|
let mut meta = schema::DirMeta::default();
|
|
let uuid = Uuid::new_v4();
|
|
let uuid_bytes = &uuid.as_bytes()[..];
|
|
let o = self
|
|
.open
|
|
.as_ref()
|
|
.ok_or_else(|| err!(FailedPrecondition, msg("database is read-only")))?;
|
|
|
|
// Populate meta.
|
|
{
|
|
meta.db_uuid.extend_from_slice(&self.uuid.as_bytes()[..]);
|
|
meta.dir_uuid.extend_from_slice(uuid_bytes);
|
|
let open = meta.in_progress_open.mut_or_insert_default();
|
|
open.id = o.id;
|
|
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
|
|
}
|
|
|
|
let dir = dir::SampleFileDir::create(&path, &meta)?;
|
|
let config = SampleFileDirConfig {
|
|
path: path.clone(),
|
|
..Default::default()
|
|
};
|
|
self.conn.execute(
|
|
r#"
|
|
insert into sample_file_dir (config, uuid, last_complete_open_id)
|
|
values (?, ?, ?)
|
|
"#,
|
|
params![&config, uuid_bytes, o.id],
|
|
)?;
|
|
let id = self.conn.last_insert_rowid() as i32;
|
|
use ::std::collections::btree_map::Entry;
|
|
let e = self.sample_file_dirs_by_id.entry(id);
|
|
let d = match e {
|
|
Entry::Vacant(e) => e.insert(SampleFileDir {
|
|
id,
|
|
path,
|
|
uuid,
|
|
dir: Some(dir),
|
|
last_complete_open: Some(*o),
|
|
garbage_needs_unlink: FnvHashSet::default(),
|
|
garbage_unlinked: Vec::new(),
|
|
}),
|
|
Entry::Occupied(_) => bail!(Internal, msg("duplicate sample file dir id {id}")),
|
|
};
|
|
meta.last_complete_open = meta.in_progress_open.take().into();
|
|
d.dir.as_ref().unwrap().write_meta(&meta)?;
|
|
Ok(id)
|
|
}
|
|
|
|
pub fn delete_sample_file_dir(&mut self, dir_id: i32) -> Result<(), Error> {
|
|
for (&id, s) in self.streams_by_id.iter() {
|
|
if s.sample_file_dir_id == Some(dir_id) {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg("can't delete dir referenced by stream {id}")
|
|
);
|
|
}
|
|
}
|
|
let mut d = match self.sample_file_dirs_by_id.entry(dir_id) {
|
|
::std::collections::btree_map::Entry::Occupied(e) => e,
|
|
_ => bail!(NotFound, msg("no such dir {dir_id} to remove")),
|
|
};
|
|
if !d.get().garbage_needs_unlink.is_empty() || !d.get().garbage_unlinked.is_empty() {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"must collect garbage before deleting directory {}",
|
|
d.get().path.display(),
|
|
),
|
|
);
|
|
}
|
|
let dir = match d.get_mut().dir.take() {
|
|
None => dir::SampleFileDir::open(&d.get().path, &d.get().expected_meta(&self.uuid))?,
|
|
Some(arc) => match Arc::strong_count(&arc) {
|
|
1 => arc, // LockedDatabase is only reference
|
|
c => {
|
|
// a writer::Syncer also has a reference.
|
|
d.get_mut().dir = Some(arc); // put it back.
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg("can't delete directory {dir_id} with active syncer (refcnt {c})"),
|
|
);
|
|
}
|
|
},
|
|
};
|
|
if !dir.is_empty()? {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"can't delete sample file directory {} which still has files",
|
|
&d.get().path.display(),
|
|
),
|
|
);
|
|
}
|
|
let mut meta = d.get().expected_meta(&self.uuid);
|
|
meta.in_progress_open = meta.last_complete_open.take().into();
|
|
dir.write_meta(&meta)?;
|
|
if self
|
|
.conn
|
|
.execute("delete from sample_file_dir where id = ?", params![dir_id])?
|
|
!= 1
|
|
{
|
|
bail!(Internal, msg("missing database row for dir {dir_id}"));
|
|
}
|
|
d.remove_entry();
|
|
Ok(())
|
|
}
|
|
|
|
/// Adds a camera.
|
|
pub fn add_camera(&mut self, mut camera: CameraChange) -> Result<i32, Error> {
|
|
let uuid = Uuid::new_v4();
|
|
let uuid_bytes = &uuid.as_bytes()[..];
|
|
let tx = self.conn.transaction()?;
|
|
let streams;
|
|
let camera_id;
|
|
{
|
|
let mut stmt = tx.prepare_cached(
|
|
r#"
|
|
insert into camera (uuid, short_name, config)
|
|
values (:uuid, :short_name, :config)
|
|
"#,
|
|
)?;
|
|
stmt.execute(named_params! {
|
|
":uuid": uuid_bytes,
|
|
":short_name": &camera.short_name,
|
|
":config": &camera.config,
|
|
})?;
|
|
camera_id = tx.last_insert_rowid() as i32;
|
|
streams =
|
|
StreamStateChanger::new(&tx, camera_id, None, &self.streams_by_id, &mut camera)?;
|
|
}
|
|
tx.commit()?;
|
|
let streams = streams.apply(&mut self.streams_by_id);
|
|
self.cameras_by_id.insert(
|
|
camera_id,
|
|
Camera {
|
|
id: camera_id,
|
|
uuid,
|
|
short_name: camera.short_name,
|
|
config: camera.config,
|
|
streams,
|
|
},
|
|
);
|
|
self.cameras_by_uuid.insert(uuid, camera_id);
|
|
Ok(camera_id)
|
|
}
|
|
|
|
/// Returns a `CameraChange` for the given camera which does nothing.
|
|
///
|
|
/// The caller can modify it to taste then pass it to `update_camera`.
|
|
/// TODO: consider renaming this to `update_camera` and creating a bulk
|
|
/// `apply_camera_changes`.
|
|
pub fn null_camera_change(&mut self, camera_id: i32) -> Result<CameraChange, Error> {
|
|
let Some(camera) = self.cameras_by_id.get(&camera_id) else {
|
|
bail!(Internal, msg("no such camera {camera_id}"));
|
|
};
|
|
let mut change = CameraChange {
|
|
short_name: camera.short_name.clone(),
|
|
config: camera.config.clone(),
|
|
streams: Default::default(),
|
|
};
|
|
for i in 0..NUM_STREAM_TYPES {
|
|
if let Some(stream_id) = camera.streams[i] {
|
|
let s = self
|
|
.streams_by_id
|
|
.get(&stream_id)
|
|
.expect("cameras reference valid streams");
|
|
change.streams[i] = StreamChange {
|
|
sample_file_dir_id: s.sample_file_dir_id,
|
|
config: s.config.clone(),
|
|
};
|
|
}
|
|
}
|
|
Ok(change)
|
|
}
|
|
|
|
/// Updates a camera.
|
|
pub fn update_camera(&mut self, camera_id: i32, mut camera: CameraChange) -> Result<(), Error> {
|
|
let tx = self.conn.transaction()?;
|
|
let streams;
|
|
let Some(c) = self.cameras_by_id.get_mut(&camera_id) else {
|
|
bail!(Internal, msg("no such camera {camera_id}"));
|
|
};
|
|
{
|
|
streams =
|
|
StreamStateChanger::new(&tx, camera_id, Some(c), &self.streams_by_id, &mut camera)?;
|
|
let mut stmt = tx.prepare_cached(
|
|
r#"
|
|
update camera set
|
|
short_name = :short_name,
|
|
config = :config
|
|
where
|
|
id = :id
|
|
"#,
|
|
)?;
|
|
let rows = stmt.execute(named_params! {
|
|
":id": camera_id,
|
|
":short_name": &camera.short_name,
|
|
":config": &camera.config,
|
|
})?;
|
|
if rows != 1 {
|
|
bail!(Internal, msg("camera {camera_id} missing from database"));
|
|
}
|
|
}
|
|
tx.commit()?;
|
|
c.short_name = camera.short_name;
|
|
c.config = camera.config;
|
|
c.streams = streams.apply(&mut self.streams_by_id);
|
|
Ok(())
|
|
}
|
|
|
|
/// Deletes a camera and its streams. The camera must have no recordings.
|
|
pub fn delete_camera(&mut self, id: i32) -> Result<(), Error> {
|
|
// TODO: also verify there are no uncommitted recordings.
|
|
let Some(uuid) = self.cameras_by_id.get(&id).map(|c| c.uuid) else {
|
|
bail!(NotFound, msg("no such camera {id}"));
|
|
};
|
|
let mut streams_to_delete = Vec::new();
|
|
let tx = self.conn.transaction()?;
|
|
{
|
|
let mut stream_stmt = tx.prepare_cached(r"delete from stream where id = :id")?;
|
|
for (stream_id, stream) in &self.streams_by_id {
|
|
if stream.camera_id != id {
|
|
continue;
|
|
};
|
|
if stream.range.is_some() {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg("can't remove camera {id}; has recordings")
|
|
);
|
|
}
|
|
let rows = stream_stmt.execute(named_params! {":id": stream_id})?;
|
|
if rows != 1 {
|
|
bail!(Internal, msg("stream {id} missing from database"));
|
|
}
|
|
streams_to_delete.push(*stream_id);
|
|
}
|
|
let mut cam_stmt = tx.prepare_cached(r"delete from camera where id = :id")?;
|
|
let rows = cam_stmt.execute(named_params! {":id": id})?;
|
|
if rows != 1 {
|
|
bail!(Internal, msg("camera {id} missing from database"));
|
|
}
|
|
}
|
|
tx.commit()?;
|
|
for id in streams_to_delete {
|
|
self.streams_by_id.remove(&id);
|
|
}
|
|
self.cameras_by_id.remove(&id);
|
|
self.cameras_by_uuid.remove(&uuid);
|
|
Ok(())
|
|
}
|
|
|
|
// TODO: it'd make more sense to have a bulk camera/stream edit API than
|
|
// this specific one.
|
|
pub fn update_retention(&mut self, changes: &[RetentionChange]) -> Result<(), Error> {
|
|
// TODO: should validate there's only one change per id.
|
|
let tx = self.conn.transaction()?;
|
|
{
|
|
let mut stmt = tx.prepare_cached(
|
|
r#"
|
|
update stream
|
|
set
|
|
config = :config
|
|
where
|
|
id = :id
|
|
"#,
|
|
)?;
|
|
for c in changes {
|
|
let Some(stream) = self.streams_by_id.get(&c.stream_id) else {
|
|
bail!(Internal, msg("no such stream {}", c.stream_id));
|
|
};
|
|
let mut new_config = stream.config.clone();
|
|
new_config.mode = (if c.new_record { "record" } else { "" }).into();
|
|
new_config.retain_bytes = c.new_limit;
|
|
let rows = stmt.execute(named_params! {
|
|
":config": &new_config,
|
|
":id": c.stream_id,
|
|
})?;
|
|
assert_eq!(rows, 1, "missing stream {}", c.stream_id);
|
|
}
|
|
}
|
|
tx.commit()?;
|
|
for c in changes {
|
|
let s = self
|
|
.streams_by_id
|
|
.get_mut(&c.stream_id)
|
|
.expect("stream in db but not state");
|
|
s.config.mode = (if c.new_record { "record" } else { "" }).into();
|
|
s.config.retain_bytes = c.new_limit;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
// ---- auth ----
|
|
|
|
pub fn users_by_id(&self) -> &BTreeMap<i32, User> {
|
|
self.auth.users_by_id()
|
|
}
|
|
|
|
pub fn get_user_by_id_mut(&mut self, id: i32) -> Option<&mut User> {
|
|
self.auth.get_user_by_id_mut(id)
|
|
}
|
|
|
|
pub fn apply_user_change(&mut self, change: UserChange) -> Result<&User, base::Error> {
|
|
self.auth.apply(&self.conn, change)
|
|
}
|
|
|
|
pub fn delete_user(&mut self, id: i32) -> Result<(), base::Error> {
|
|
self.auth.delete_user(&mut self.conn, id)
|
|
}
|
|
|
|
pub fn get_user(&self, username: &str) -> Option<&User> {
|
|
self.auth.get_user(username)
|
|
}
|
|
|
|
pub fn login_by_password(
|
|
&mut self,
|
|
req: auth::Request,
|
|
username: &str,
|
|
password: String,
|
|
domain: Option<Vec<u8>>,
|
|
session_flags: i32,
|
|
) -> Result<(RawSessionId, &Session), base::Error> {
|
|
self.auth
|
|
.login_by_password(&self.conn, req, username, password, domain, session_flags)
|
|
}
|
|
|
|
pub fn make_session(
|
|
&mut self,
|
|
creation: Request,
|
|
uid: i32,
|
|
domain: Option<Vec<u8>>,
|
|
flags: i32,
|
|
permissions: schema::Permissions,
|
|
) -> Result<(RawSessionId, &Session), base::Error> {
|
|
self.auth
|
|
.make_session(&self.conn, creation, uid, domain, flags, permissions)
|
|
}
|
|
|
|
pub fn authenticate_session(
|
|
&mut self,
|
|
req: auth::Request,
|
|
sid: &auth::SessionHash,
|
|
) -> Result<(&auth::Session, &User), base::Error> {
|
|
self.auth.authenticate_session(&self.conn, req, sid)
|
|
}
|
|
|
|
pub fn revoke_session(
|
|
&mut self,
|
|
reason: auth::RevocationReason,
|
|
detail: Option<String>,
|
|
req: auth::Request,
|
|
hash: &auth::SessionHash,
|
|
) -> Result<(), base::Error> {
|
|
self.auth
|
|
.revoke_session(&self.conn, reason, detail, req, hash)
|
|
}
|
|
|
|
// ---- signal ----
|
|
|
|
pub fn signals_by_id(&self) -> &BTreeMap<u32, signal::Signal> {
|
|
self.signal.signals_by_id()
|
|
}
|
|
pub fn signal_types_by_uuid(&self) -> &FnvHashMap<Uuid, signal::Type> {
|
|
self.signal.types_by_uuid()
|
|
}
|
|
pub fn list_changes_by_time(
|
|
&self,
|
|
desired_time: Range<recording::Time>,
|
|
f: &mut dyn FnMut(&signal::ListStateChangesRow),
|
|
) {
|
|
self.signal.list_changes_by_time(desired_time, f)
|
|
}
|
|
pub fn update_signals(
|
|
&mut self,
|
|
when: Range<recording::Time>,
|
|
signals: &[u32],
|
|
states: &[u16],
|
|
) -> Result<(), base::Error> {
|
|
self.signal.update_signals(when, signals, states)
|
|
}
|
|
}
|
|
|
|
/// Pragmas for full database integrity.
|
|
///
|
|
/// These are `pub` so that the `moonfire-nvr sql` command can pass to the SQLite3 binary with
|
|
/// `-cmd`.
|
|
pub static INTEGRITY_PRAGMAS: [&str; 3] = [
|
|
// Enforce foreign keys. This is on by default with --features=bundled (as rusqlite
|
|
// compiles the SQLite3 amalgamation with -DSQLITE_DEFAULT_FOREIGN_KEYS=1). Ensure it's
|
|
// always on. Note that our foreign keys are immediate rather than deferred, so we have to
|
|
// be careful about the order of operations during the upgrade.
|
|
"pragma foreign_keys = on",
|
|
// Make the database actually durable.
|
|
"pragma fullfsync = on",
|
|
"pragma synchronous = 3",
|
|
];
|
|
|
|
/// Sets pragmas for full database integrity.
|
|
pub(crate) fn set_integrity_pragmas(conn: &mut rusqlite::Connection) -> Result<(), Error> {
|
|
for pragma in INTEGRITY_PRAGMAS {
|
|
conn.execute(pragma, params![])?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) fn check_sqlite_version() -> Result<(), Error> {
|
|
// SQLite version 3.8.2 introduced the "without rowid" syntax used in the schema.
|
|
// https://www.sqlite.org/withoutrowid.html
|
|
if rusqlite::version_number() < 3008002 {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"SQLite version {} is too old; need at least 3.8.2",
|
|
rusqlite::version()
|
|
),
|
|
);
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Initializes a database.
|
|
/// Note this doesn't set journal options, so that it can be used on in-memory databases for
|
|
/// test code.
|
|
pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> {
|
|
check_sqlite_version()?;
|
|
set_integrity_pragmas(conn)?;
|
|
let tx = conn.transaction()?;
|
|
tx.execute_batch(include_str!("schema.sql"))
|
|
.map_err(|e| err!(e, msg("unable to create database schema")))?;
|
|
{
|
|
let uuid = ::uuid::Uuid::new_v4();
|
|
let uuid_bytes = &uuid.as_bytes()[..];
|
|
tx.execute("insert into meta (uuid) values (?)", params![uuid_bytes])?;
|
|
}
|
|
tx.commit()?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Gets the schema version from the given database connection.
|
|
/// A fully initialized database will return `Ok(Some(version))` where `version` is an integer that
|
|
/// can be compared to `EXPECTED_VERSION`. An empty database will return `Ok(None)`. A partially
|
|
/// initialized database (in particular, one without a version row) will return some error.
|
|
pub fn get_schema_version(conn: &rusqlite::Connection) -> Result<Option<i32>, Error> {
|
|
let ver_tables: i32 = conn.query_row_and_then(
|
|
"select count(*) from sqlite_master where name = 'version'",
|
|
params![],
|
|
|row| row.get(0),
|
|
)?;
|
|
if ver_tables == 0 {
|
|
return Ok(None);
|
|
}
|
|
Ok(Some(conn.query_row_and_then(
|
|
"select max(id) from version",
|
|
params![],
|
|
|row| row.get(0),
|
|
)?))
|
|
}
|
|
|
|
/// Returns the UUID associated with the current system boot, if available.
|
|
fn get_boot_uuid() -> Result<Option<Uuid>, Error> {
|
|
if cfg!(target_os = "linux") {
|
|
let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")?;
|
|
Ok(Some(Uuid::parse_str(boot_id.trim_end()).map_err(|e| {
|
|
err!(Internal, msg("boot_id is not a valid uuid"), source(e))
|
|
})?))
|
|
} else {
|
|
Ok(None) // don't complain about lack of platform support; just return None.
|
|
}
|
|
}
|
|
|
|
/// Checks that the schema version in the given database is as expected.
|
|
pub(crate) fn check_schema_version(conn: &rusqlite::Connection) -> Result<(), Error> {
|
|
let Some(ver) = get_schema_version(conn)? else {
|
|
bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"no such table: version.\n\n\
|
|
If you have created an empty database by hand, delete it and use `nvr init` \
|
|
instead, as noted in the installation instructions: \
|
|
<https://github.com/scottlamb/moonfire-nvr/blob/master/guide/install.md>\n\n\
|
|
If you are starting from a database that predates schema versioning, see \
|
|
<https://github.com/scottlamb/moonfire-nvr/blob/master/guide/schema.md>."),
|
|
)
|
|
};
|
|
match ver.cmp(&EXPECTED_VERSION) {
|
|
std::cmp::Ordering::Less => bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"database schema version {ver} is too old (expected {EXPECTED_VERSION}); \
|
|
see upgrade instructions in guide/upgrade.md"
|
|
),
|
|
),
|
|
std::cmp::Ordering::Equal => Ok(()),
|
|
std::cmp::Ordering::Greater => bail!(
|
|
FailedPrecondition,
|
|
msg(
|
|
"database schema version {ver} is too new (expected {EXPECTED_VERSION}); \
|
|
must use a newer binary to match"
|
|
),
|
|
),
|
|
}
|
|
}
|
|
|
|
/// The recording database. Abstracts away SQLite queries. Also maintains in-memory state
|
|
/// (loaded on startup, and updated on successful commit) to avoid expensive scans over the
|
|
/// recording table on common queries.
|
|
pub struct Database<C: Clocks + Clone = clock::RealClocks> {
|
|
/// This is wrapped in an `Option` to allow the `Drop` implementation and `close` to coexist.
|
|
db: Option<Mutex<LockedDatabase>>,
|
|
|
|
/// This is kept separately from the `LockedDatabase` to allow the `lock()` operation itself to
|
|
/// access it. It doesn't need a `Mutex` anyway; it's `Sync`, and all operations work on
|
|
/// `&self`.
|
|
clocks: C,
|
|
}
|
|
|
|
impl<C: Clocks + Clone> Drop for Database<C> {
|
|
fn drop(&mut self) {
|
|
if ::std::thread::panicking() {
|
|
return; // don't flush while panicking.
|
|
}
|
|
if let Some(m) = self.db.take() {
|
|
if let Err(e) = m.into_inner().unwrap().flush(&self.clocks, "drop") {
|
|
error!(err = %e.chain(), "final database flush failed");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Helpers for Database::lock(). Closures don't implement Fn.
|
|
fn acquisition() -> &'static str {
|
|
"database lock acquisition"
|
|
}
|
|
fn operation() -> &'static str {
|
|
"database operation"
|
|
}
|
|
|
|
impl<C: Clocks + Clone> Database<C> {
|
|
/// Creates the database from a caller-supplied SQLite connection.
|
|
pub fn new(
|
|
clocks: C,
|
|
mut conn: rusqlite::Connection,
|
|
read_write: bool,
|
|
) -> Result<Database<C>, Error> {
|
|
check_sqlite_version()?;
|
|
set_integrity_pragmas(&mut conn)?;
|
|
check_schema_version(&conn)?;
|
|
|
|
// Note: the meta check comes after the version check to improve the error message when
|
|
// trying to open a version 0 or version 1 database (which lacked the meta table).
|
|
let (db_uuid, config) = raw::read_meta(&conn)?;
|
|
let open_monotonic = recording::Time::new(clocks.monotonic());
|
|
let open = if read_write {
|
|
let real = recording::Time::new(clocks.realtime());
|
|
let mut stmt = conn
|
|
.prepare(" insert into open (uuid, start_time_90k, boot_uuid) values (?, ?, ?)")?;
|
|
let open_uuid = SqlUuid(Uuid::new_v4());
|
|
let boot_uuid = match get_boot_uuid() {
|
|
Err(e) => {
|
|
warn!(err = %e.chain(), "unable to get boot uuid");
|
|
None
|
|
}
|
|
Ok(id) => id.map(SqlUuid),
|
|
};
|
|
stmt.execute(params![open_uuid, real.0, boot_uuid])?;
|
|
let id = conn.last_insert_rowid() as u32;
|
|
Some(Open {
|
|
id,
|
|
uuid: open_uuid.0,
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
let auth = auth::State::init(&conn)?;
|
|
let signal = signal::State::init(&conn, &config)?;
|
|
let db = Database {
|
|
db: Some(Mutex::new(LockedDatabase {
|
|
conn,
|
|
uuid: db_uuid,
|
|
flush_count: 0,
|
|
open,
|
|
open_monotonic,
|
|
auth,
|
|
signal,
|
|
sample_file_dirs_by_id: BTreeMap::new(),
|
|
cameras_by_id: BTreeMap::new(),
|
|
cameras_by_uuid: BTreeMap::new(),
|
|
streams_by_id: BTreeMap::new(),
|
|
video_sample_entries_by_id: BTreeMap::new(),
|
|
video_index_cache: RefCell::new(LinkedHashMap::with_capacity_and_hasher(
|
|
VIDEO_INDEX_CACHE_LEN + 1,
|
|
Default::default(),
|
|
)),
|
|
on_flush: Vec::new(),
|
|
})),
|
|
clocks,
|
|
};
|
|
{
|
|
let l = &mut *db.lock();
|
|
l.init_video_sample_entries()?;
|
|
l.init_sample_file_dirs()?;
|
|
l.init_cameras()?;
|
|
l.init_streams()?;
|
|
for (&stream_id, ref mut stream) in &mut l.streams_by_id {
|
|
// TODO: we could use one thread per stream if we had multiple db conns.
|
|
let camera = l.cameras_by_id.get(&stream.camera_id).unwrap();
|
|
init_recordings(&mut l.conn, stream_id, camera, stream)?;
|
|
}
|
|
}
|
|
Ok(db)
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn clocks(&self) -> C {
|
|
self.clocks.clone()
|
|
}
|
|
|
|
/// Locks the database; the returned reference is the only way to perform (read or write)
|
|
/// operations.
|
|
pub fn lock(&self) -> DatabaseGuard<C> {
|
|
let timer = clock::TimerGuard::new(&self.clocks, acquisition);
|
|
let db = self.db.as_ref().unwrap().lock().unwrap();
|
|
drop(timer);
|
|
let _timer = clock::TimerGuard::<C, &'static str, fn() -> &'static str>::new(
|
|
&self.clocks,
|
|
operation,
|
|
);
|
|
DatabaseGuard {
|
|
clocks: &self.clocks,
|
|
db,
|
|
_timer,
|
|
}
|
|
}
|
|
|
|
/// For testing: closes the database (without flushing) and returns the connection.
|
|
/// This allows verification that a newly opened database is in an acceptable state.
|
|
#[cfg(test)]
|
|
fn close(mut self) -> rusqlite::Connection {
|
|
self.db.take().unwrap().into_inner().unwrap().conn
|
|
}
|
|
}
|
|
|
|
/// Reference to a locked database returned by [Database::lock].
|
|
pub struct DatabaseGuard<'db, C: Clocks> {
|
|
clocks: &'db C,
|
|
db: MutexGuard<'db, LockedDatabase>,
|
|
_timer: clock::TimerGuard<'db, C, &'static str, fn() -> &'static str>,
|
|
}
|
|
|
|
impl<'db, C: Clocks + Clone> DatabaseGuard<'db, C> {
|
|
/// Tries to flush unwritten changes from the stream directories.
|
|
///
|
|
/// * commits any recordings added with `add_recording` that have since been marked as
|
|
/// synced.
|
|
/// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`.
|
|
/// * removes entries from the garbage table as requested by `mark_sample_files_deleted`.
|
|
///
|
|
/// On success, for each affected sample file directory with a flush watcher set, sends a
|
|
/// `Flush` event.
|
|
pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> {
|
|
self.db.flush(self.clocks, reason)
|
|
}
|
|
}
|
|
|
|
impl<'db, C: Clocks + Clone> ::std::ops::Deref for DatabaseGuard<'db, C> {
|
|
type Target = LockedDatabase;
|
|
fn deref(&self) -> &LockedDatabase {
|
|
&self.db
|
|
}
|
|
}
|
|
|
|
impl<'db, C: Clocks + Clone> ::std::ops::DerefMut for DatabaseGuard<'db, C> {
|
|
fn deref_mut(&mut self) -> &mut LockedDatabase {
|
|
&mut self.db
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::recording::{self, TIME_UNITS_PER_SEC};
|
|
use crate::testutil;
|
|
use base::clock;
|
|
use rusqlite::Connection;
|
|
use url::Url;
|
|
use uuid::Uuid;
|
|
|
|
fn setup_conn() -> Connection {
|
|
let mut conn = Connection::open_in_memory().unwrap();
|
|
super::init(&mut conn).unwrap();
|
|
conn
|
|
}
|
|
|
|
fn assert_no_recordings(db: &Database, uuid: Uuid) {
|
|
let mut rows = 0;
|
|
let mut camera_id = -1;
|
|
{
|
|
let db = db.lock();
|
|
for row in db.cameras_by_id().values() {
|
|
rows += 1;
|
|
camera_id = row.id;
|
|
assert_eq!(uuid, row.uuid);
|
|
assert_eq!(
|
|
"http://test-camera/",
|
|
row.config.onvif_base_url.as_ref().unwrap().as_str()
|
|
);
|
|
assert_eq!("foo", &row.config.username);
|
|
assert_eq!("bar", &row.config.password);
|
|
//assert_eq!("/main", row.main_rtsp_url);
|
|
//assert_eq!("/sub", row.sub_rtsp_url);
|
|
//assert_eq!(42, row.retain_bytes);
|
|
//assert_eq!(None, row.range);
|
|
//assert_eq!(recording::Duration(0), row.duration);
|
|
//assert_eq!(0, row.sample_file_bytes);
|
|
}
|
|
}
|
|
assert_eq!(1, rows);
|
|
|
|
let stream_id = camera_id; // TODO
|
|
rows = 0;
|
|
{
|
|
let db = db.lock();
|
|
let all_time = recording::Time(i64::min_value())..recording::Time(i64::max_value());
|
|
db.list_recordings_by_time(stream_id, all_time, &mut |_row| {
|
|
rows += 1;
|
|
Ok(())
|
|
})
|
|
.unwrap();
|
|
}
|
|
assert_eq!(0, rows);
|
|
}
|
|
|
|
fn assert_single_recording(db: &Database, stream_id: i32, r: &RecordingToInsert) {
|
|
{
|
|
let db = db.lock();
|
|
let stream = db.streams_by_id().get(&stream_id).unwrap();
|
|
let dur = recording::Duration(r.wall_duration_90k as i64);
|
|
assert_eq!(Some(r.start..r.start + dur), stream.range);
|
|
assert_eq!(r.sample_file_bytes as i64, stream.sample_file_bytes);
|
|
assert_eq!(dur, stream.duration);
|
|
db.cameras_by_id().get(&stream.camera_id).unwrap();
|
|
}
|
|
|
|
// TODO(slamb): test that the days logic works correctly.
|
|
|
|
let mut rows = 0;
|
|
let mut recording_id = None;
|
|
{
|
|
let db = db.lock();
|
|
let all_time = recording::Time(i64::min_value())..recording::Time(i64::max_value());
|
|
db.list_recordings_by_time(stream_id, all_time, &mut |row| {
|
|
rows += 1;
|
|
recording_id = Some(row.id);
|
|
assert_eq!(r.start, row.start);
|
|
assert_eq!(r.wall_duration_90k, row.wall_duration_90k);
|
|
assert_eq!(r.video_samples, row.video_samples);
|
|
assert_eq!(r.video_sync_samples, row.video_sync_samples);
|
|
assert_eq!(r.sample_file_bytes, row.sample_file_bytes);
|
|
let vse = db
|
|
.video_sample_entries_by_id()
|
|
.get(&row.video_sample_entry_id)
|
|
.unwrap();
|
|
assert_eq!(vse.rfc6381_codec, "avc1.4d0029");
|
|
Ok(())
|
|
})
|
|
.unwrap();
|
|
}
|
|
assert_eq!(1, rows);
|
|
|
|
rows = 0;
|
|
raw::list_oldest_recordings(
|
|
&db.lock().conn,
|
|
CompositeId::new(stream_id, 0),
|
|
&mut |row| {
|
|
rows += 1;
|
|
assert_eq!(recording_id, Some(row.id));
|
|
assert_eq!(r.start, row.start);
|
|
assert_eq!(r.wall_duration_90k, row.wall_duration_90k);
|
|
assert_eq!(r.sample_file_bytes, row.sample_file_bytes);
|
|
true
|
|
},
|
|
)
|
|
.unwrap();
|
|
assert_eq!(1, rows);
|
|
|
|
// TODO: list_aggregated_recordings.
|
|
// TODO: with_recording_playback.
|
|
}
|
|
|
|
#[test]
|
|
fn test_no_meta_or_version() {
|
|
testutil::init();
|
|
let e = Database::new(
|
|
clock::RealClocks {},
|
|
Connection::open_in_memory().unwrap(),
|
|
false,
|
|
)
|
|
.err()
|
|
.unwrap();
|
|
assert!(e.msg().unwrap().starts_with("no such table"), "{}", e);
|
|
}
|
|
|
|
#[test]
|
|
fn test_version_too_old() {
|
|
testutil::init();
|
|
let c = setup_conn();
|
|
c.execute_batch("delete from version; insert into version values (6, 0, '');")
|
|
.unwrap();
|
|
let e = Database::new(clock::RealClocks {}, c, false).err().unwrap();
|
|
assert!(
|
|
e.msg()
|
|
.unwrap()
|
|
.starts_with("database schema version 6 is too old (expected 7)"),
|
|
"got: {e:?}"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_version_too_new() {
|
|
testutil::init();
|
|
let c = setup_conn();
|
|
c.execute_batch("delete from version; insert into version values (8, 0, '');")
|
|
.unwrap();
|
|
let e = Database::new(clock::RealClocks {}, c, false).err().unwrap();
|
|
assert!(
|
|
e.msg()
|
|
.unwrap()
|
|
.starts_with("database schema version 8 is too new (expected 7)"),
|
|
"got: {e:?}"
|
|
);
|
|
}
|
|
|
|
/// Basic test of running some queries on a fresh database.
|
|
#[test]
|
|
fn test_fresh_db() {
|
|
testutil::init();
|
|
let conn = setup_conn();
|
|
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
|
|
let db = db.lock();
|
|
assert_eq!(0, db.cameras_by_id().values().count());
|
|
}
|
|
|
|
/// Basic test of the full lifecycle of recording. Does not exercise error cases.
|
|
#[test]
|
|
fn test_full_lifecycle() {
|
|
testutil::init();
|
|
let conn = setup_conn();
|
|
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
|
|
let tmpdir = tempfile::Builder::new()
|
|
.prefix("moonfire-nvr-test")
|
|
.tempdir()
|
|
.unwrap();
|
|
let path = tmpdir.path().to_owned();
|
|
let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap();
|
|
let mut c = CameraChange {
|
|
short_name: "testcam".to_owned(),
|
|
config: crate::json::CameraConfig {
|
|
description: "".to_owned(),
|
|
onvif_base_url: Some(Url::parse("http://test-camera/").unwrap()),
|
|
username: "foo".to_owned(),
|
|
password: "bar".to_owned(),
|
|
..Default::default()
|
|
},
|
|
streams: [
|
|
StreamChange {
|
|
sample_file_dir_id: Some(sample_file_dir_id),
|
|
config: crate::json::StreamConfig {
|
|
url: Some(Url::parse("rtsp://test-camera/main").unwrap()),
|
|
mode: crate::json::STREAM_MODE_RECORD.to_owned(),
|
|
flush_if_sec: 1,
|
|
..Default::default()
|
|
},
|
|
},
|
|
StreamChange {
|
|
sample_file_dir_id: Some(sample_file_dir_id),
|
|
config: crate::json::StreamConfig {
|
|
url: Some(Url::parse("rtsp://test-camera/sub").unwrap()),
|
|
mode: crate::json::STREAM_MODE_RECORD.to_owned(),
|
|
flush_if_sec: 1,
|
|
..Default::default()
|
|
},
|
|
},
|
|
StreamChange::default(),
|
|
],
|
|
};
|
|
let camera_id = db.lock().add_camera(c.clone()).unwrap();
|
|
let (main_stream_id, sub_stream_id);
|
|
{
|
|
let mut l = db.lock();
|
|
{
|
|
let c = l.cameras_by_id().get(&camera_id).unwrap();
|
|
main_stream_id = c.streams[0].unwrap();
|
|
sub_stream_id = c.streams[1].unwrap();
|
|
}
|
|
l.update_retention(&[super::RetentionChange {
|
|
stream_id: main_stream_id,
|
|
new_record: true,
|
|
new_limit: 42,
|
|
}])
|
|
.unwrap();
|
|
{
|
|
let main = l.streams_by_id().get(&main_stream_id).unwrap();
|
|
assert_eq!(main.config.mode, crate::json::STREAM_MODE_RECORD);
|
|
assert_eq!(main.config.retain_bytes, 42);
|
|
assert_eq!(main.config.flush_if_sec, 1);
|
|
}
|
|
|
|
assert_eq!(
|
|
l.streams_by_id()
|
|
.get(&sub_stream_id)
|
|
.unwrap()
|
|
.config
|
|
.flush_if_sec,
|
|
1
|
|
);
|
|
c.streams[1].config.flush_if_sec = 2;
|
|
l.update_camera(camera_id, c).unwrap();
|
|
assert_eq!(
|
|
l.streams_by_id()
|
|
.get(&sub_stream_id)
|
|
.unwrap()
|
|
.config
|
|
.flush_if_sec,
|
|
2
|
|
);
|
|
}
|
|
let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid };
|
|
assert_no_recordings(&db, camera_uuid);
|
|
assert_eq!(
|
|
db.lock()
|
|
.streams_by_id()
|
|
.get(&main_stream_id)
|
|
.unwrap()
|
|
.cum_recordings,
|
|
0
|
|
);
|
|
|
|
// Closing and reopening the database should present the same contents.
|
|
let conn = db.close();
|
|
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
|
|
assert_eq!(
|
|
db.lock()
|
|
.streams_by_id()
|
|
.get(&sub_stream_id)
|
|
.unwrap()
|
|
.config
|
|
.flush_if_sec,
|
|
2
|
|
);
|
|
assert_no_recordings(&db, camera_uuid);
|
|
assert_eq!(
|
|
db.lock()
|
|
.streams_by_id()
|
|
.get(&main_stream_id)
|
|
.unwrap()
|
|
.cum_recordings,
|
|
0
|
|
);
|
|
|
|
// TODO: assert_eq!(db.lock().list_garbage(sample_file_dir_id).unwrap(), &[]);
|
|
|
|
let vse_id = db
|
|
.lock()
|
|
.insert_video_sample_entry(VideoSampleEntryToInsert {
|
|
width: 1920,
|
|
height: 1080,
|
|
pasp_h_spacing: 1,
|
|
pasp_v_spacing: 1,
|
|
data: include_bytes!("testdata/avc1").to_vec(),
|
|
rfc6381_codec: "avc1.4d0029".to_owned(),
|
|
})
|
|
.unwrap();
|
|
assert!(vse_id > 0, "vse_id = {vse_id}");
|
|
|
|
// Inserting a recording should succeed and advance the next recording id.
|
|
let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC);
|
|
let recording = RecordingToInsert {
|
|
sample_file_bytes: 42,
|
|
run_offset: 0,
|
|
flags: 0,
|
|
start,
|
|
prev_media_duration: recording::Duration(0),
|
|
prev_runs: 0,
|
|
wall_duration_90k: TIME_UNITS_PER_SEC.try_into().unwrap(),
|
|
media_duration_90k: TIME_UNITS_PER_SEC.try_into().unwrap(),
|
|
local_time_delta: recording::Duration(0),
|
|
video_samples: 1,
|
|
video_sync_samples: 1,
|
|
video_sample_entry_id: vse_id,
|
|
video_index: [0u8; 100].to_vec(),
|
|
sample_file_blake3: None,
|
|
end_reason: None,
|
|
};
|
|
let id = {
|
|
let mut db = db.lock();
|
|
let (id, _) = db.add_recording(main_stream_id, recording.clone()).unwrap();
|
|
db.mark_synced(id).unwrap();
|
|
db.flush("add test").unwrap();
|
|
id
|
|
};
|
|
assert_eq!(
|
|
db.lock()
|
|
.streams_by_id()
|
|
.get(&main_stream_id)
|
|
.unwrap()
|
|
.cum_recordings,
|
|
1
|
|
);
|
|
|
|
// Queries should return the correct result (with caches update on insert).
|
|
assert_single_recording(&db, main_stream_id, &recording);
|
|
|
|
// Queries on a fresh database should return the correct result (with caches populated from
|
|
// existing database contents rather than built on insert).
|
|
let conn = db.close();
|
|
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
|
|
assert_single_recording(&db, main_stream_id, &recording);
|
|
|
|
// Deleting a recording should succeed, update the min/max times, and mark it as garbage.
|
|
{
|
|
let mut db = db.lock();
|
|
let mut n = 0;
|
|
db.delete_oldest_recordings(main_stream_id, &mut |_| {
|
|
n += 1;
|
|
true
|
|
})
|
|
.unwrap();
|
|
assert_eq!(n, 1);
|
|
{
|
|
let s = db.streams_by_id().get(&main_stream_id).unwrap();
|
|
assert_eq!(s.sample_file_bytes, 42);
|
|
assert_eq!(s.bytes_to_delete, 42);
|
|
}
|
|
n = 0;
|
|
|
|
// A second run
|
|
db.delete_oldest_recordings(main_stream_id, &mut |_| {
|
|
n += 1;
|
|
true
|
|
})
|
|
.unwrap();
|
|
assert_eq!(n, 0);
|
|
assert_eq!(
|
|
db.streams_by_id()
|
|
.get(&main_stream_id)
|
|
.unwrap()
|
|
.bytes_to_delete,
|
|
42
|
|
);
|
|
db.flush("delete test").unwrap();
|
|
let s = db.streams_by_id().get(&main_stream_id).unwrap();
|
|
assert_eq!(s.sample_file_bytes, 0);
|
|
assert_eq!(s.bytes_to_delete, 0);
|
|
}
|
|
assert_no_recordings(&db, camera_uuid);
|
|
let g: Vec<_> = db
|
|
.lock()
|
|
.sample_file_dirs_by_id()
|
|
.get(&sample_file_dir_id)
|
|
.unwrap()
|
|
.garbage_needs_unlink
|
|
.iter()
|
|
.copied()
|
|
.collect();
|
|
assert_eq!(&g, &[id]);
|
|
let g: Vec<_> = db
|
|
.lock()
|
|
.sample_file_dirs_by_id()
|
|
.get(&sample_file_dir_id)
|
|
.unwrap()
|
|
.garbage_unlinked
|
|
.iter()
|
|
.copied()
|
|
.collect();
|
|
assert_eq!(&g, &[]);
|
|
}
|
|
|
|
#[test]
|
|
fn round_up() {
|
|
assert_eq!(super::round_up(0), 0);
|
|
assert_eq!(super::round_up(8_191), 8_192);
|
|
assert_eq!(super::round_up(8_192), 8_192);
|
|
assert_eq!(super::round_up(8_193), 12_288);
|
|
}
|
|
}
|