new database/sample file dir interlock scheme

The idea is to avoid the problems described in src/schema.proto; those
possibilities have bothered me for a while. A bonus is that (in a future
commit) it can replace the sample file uuid scheme in favor of using
<camera_uuid>-<stream_type>/<recording_id> for several advantages:

  * on data integrity problems (specifically, extra sample files), more
    information to use to understand what happened.
  * no more reserving sample files prior to using them. This avoids some extra
    database transactions on startup (now there's an extra two total rather
    than an extra one per stream). It also simplifies an upcoming change I
    want to make in which some streams are not flushed immediately, reducing
    the write load significantly (maybe one per minute total rather than one
    per stream per minute).
  * get rid of eight bytes per playback cache entry in RAM (and nine bytes
    per recording_playback row on flash).

The implementation is still pretty rough in places:

  * Lack of tests.
  * Poor ode organization. In particular, SampleFileDirectory::write_meta
    shouldn't be exposed beyond db. I'm thinking about moving db.rs and
    SampleFileDirectory to a new crate, moonfire_nvr_db. This would improve
    compile times as well.
  * No tooling for renaming a sample file directory.
  * Config subcommand still panics in conditions that can be reasonably
    expected to happen.
This commit is contained in:
Scott Lamb
2018-02-14 23:10:10 -08:00
parent 89b6bccaa3
commit e7f5733f29
19 changed files with 1508 additions and 344 deletions

View File

@@ -207,9 +207,11 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc<db::Database>, id: i32, to_delet
fn lower_retention(db: &Arc<db::Database>, zero_limits: BTreeMap<i32, Vec<dir::NewLimit>>)
-> Result<(), Error> {
let dirs_to_open: Vec<_> = zero_limits.keys().map(|id| *id).collect();
db.lock().open_sample_file_dirs(&dirs_to_open[..])?;
for (dir_id, l) in &zero_limits {
let dir = db.lock().sample_file_dirs_by_id().get(dir_id).unwrap().open()?;
dir::lower_retention(dir, db.clone(), &l)?;
let dir = db.lock().sample_file_dirs_by_id().get(dir_id).unwrap().get()?;
dir::lower_retention(dir.clone(), db.clone(), &l)?;
}
Ok(())
}

View File

@@ -145,8 +145,9 @@ fn actually_delete(model: &RefCell<Model>, siv: &mut Cursive) {
siv.pop_layer(); // deletion confirmation
siv.pop_layer(); // retention dialog
let dir = {
let l = model.db.lock();
l.sample_file_dirs_by_id().get(&model.dir_id).unwrap().open().unwrap()
let mut l = model.db.lock();
l.open_sample_file_dirs(&[model.dir_id]).unwrap(); // TODO: don't unwrap.
l.sample_file_dirs_by_id().get(&model.dir_id).unwrap().get().unwrap()
};
if let Err(e) = dir::lower_retention(dir, model.db.clone(), &new_limits[..]) {
siv.add_layer(views::Dialog::text(format!("Unable to delete excess video: {}", e))
@@ -281,7 +282,7 @@ fn edit_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive, dir_id: i32) {
let mut total_retain = 0;
let fs_capacity;
{
let l = db.lock();
let mut l = db.lock();
for (&id, s) in l.streams_by_id() {
let c = l.cameras_by_id().get(&s.camera_id).expect("stream without camera");
if s.sample_file_dir_id != Some(dir_id) {
@@ -299,10 +300,9 @@ fn edit_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive, dir_id: i32) {
if streams.is_empty() {
return delete_dir_dialog(db, siv, dir_id);
}
l.open_sample_file_dirs(&[dir_id]).unwrap(); // TODO: don't unwrap.
let dir = l.sample_file_dirs_by_id().get(&dir_id).unwrap();
// TODO: go another way if open fails.
let stat = dir.open().unwrap().statfs().unwrap();
let stat = dir.get().unwrap().statfs().unwrap();
fs_capacity = stat.f_bsize as i64 * stat.f_bavail as i64 + total_used;
path = dir.path.clone();
}

View File

@@ -124,7 +124,7 @@ struct Args {
pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?;
let db = Arc::new(db::Database::new(conn)?);
let db = Arc::new(db::Database::new(conn, true)?);
let mut siv = Cursive::new();
//siv.add_global_callback('q', |s| s.quit());

View File

@@ -66,9 +66,7 @@ pub fn run() -> Result<(), Error> {
pragma journal_mode = wal;
pragma page_size = 16384;
"#)?;
let tx = conn.transaction()?;
tx.execute_batch(include_str!("../schema.sql"))?;
tx.commit()?;
db::Database::init(&mut conn)?;
info!("Database initialized.");
Ok(())
}

View File

@@ -100,9 +100,17 @@ pub fn run() -> Result<(), Error> {
let (_db_dir, conn) = super::open_conn(
&args.flag_db_dir,
if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?;
let db = Arc::new(db::Database::new(conn).unwrap());
let db = Arc::new(db::Database::new(conn, !args.flag_read_only).unwrap());
info!("Database is loaded.");
{
let mut l = db.lock();
let dirs_to_open: Vec<_> =
l.streams_by_id().values().filter_map(|s| s.sample_file_dir_id).collect();
l.open_sample_file_dirs(&dirs_to_open)?;
}
info!("Directories are opened.");
let s = web::Service::new(db.clone(), Some(&args.flag_ui_dir), resolve_zone())?;
// Start a streamer for each stream.
@@ -120,13 +128,13 @@ pub fn run() -> Result<(), Error> {
shutdown: &shutdown_streamers,
};
// Create directories for streams that need them.
// Get the directories that need syncers.
for stream in l.streams_by_id().values() {
if let (Some(id), true) = (stream.sample_file_dir_id, stream.record) {
dirs.entry(id).or_insert_with(|| {
let d = l.sample_file_dirs_by_id().get(&id).unwrap();
info!("Starting syncer for path {}", d.path);
d.open()
d.get().unwrap()
});
}
}
@@ -135,7 +143,6 @@ pub fn run() -> Result<(), Error> {
drop(l);
let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default());
for (id, dir) in dirs.drain() {
let dir = dir?;
let (channel, join) = dir::start_syncer(dir.clone(), db.clone())?;
syncers.insert(id, Syncer {
dir,

View File

@@ -64,10 +64,10 @@ Options:
const UPGRADE_NOTES: &'static str =
concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION"));
const UPGRADERS: [fn(&rusqlite::Transaction, &Args) -> Result<(), Error>; 2] = [
v0_to_v1::run,
v1_to_v2::run,
];
pub trait Upgrader {
fn in_tx(&mut self, &rusqlite::Transaction) -> Result<(), Error> { Ok(()) }
fn post_tx(&mut self) -> Result<(), Error> { Ok(()) }
}
#[derive(Debug, Deserialize)]
pub struct Args {
@@ -89,8 +89,13 @@ pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
let (_db_dir, mut conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?;
let upgraders = [
v0_to_v1::new,
v1_to_v2::new,
];
{
assert_eq!(UPGRADERS.len(), db::EXPECTED_VERSION as usize);
assert_eq!(upgraders.len(), db::EXPECTED_VERSION as usize);
let old_ver =
conn.query_row("select max(id) from version", &[], |row| row.get_checked(0))??;
if old_ver > db::EXPECTED_VERSION {
@@ -103,13 +108,15 @@ pub fn run() -> Result<(), Error> {
set_journal_mode(&conn, &args.flag_preset_journal).unwrap();
for ver in old_ver .. db::EXPECTED_VERSION {
info!("...from version {} to version {}", ver, ver + 1);
let mut u = upgraders[ver as usize](&args)?;
let tx = conn.transaction()?;
UPGRADERS[ver as usize](&tx, &args)?;
u.in_tx(&tx)?;
tx.execute(r#"
insert into version (id, unix_time, notes)
values (?, cast(strftime('%s', 'now') as int32), ?)
"#, &[&(ver + 1), &UPGRADE_NOTES])?;
tx.commit()?;
u.post_tx()?;
}
}

View File

@@ -37,65 +37,73 @@ use rusqlite;
use std::collections::HashMap;
use strutil;
pub fn run(tx: &rusqlite::Transaction, _args: &super::Args) -> Result<(), Error> {
// These create statements match the schema.sql when version 1 was the latest.
tx.execute_batch(r#"
alter table camera rename to old_camera;
create table camera (
id integer primary key,
uuid blob unique,
short_name text not null,
description text,
host text,
username text,
password text,
main_rtsp_path text,
sub_rtsp_path text,
retain_bytes integer not null check (retain_bytes >= 0),
next_recording_id integer not null check (next_recording_id >= 0)
);
alter table recording rename to old_recording;
drop index recording_cover;
create table recording (
composite_id integer primary key,
camera_id integer not null references camera (id),
run_offset integer not null,
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
start_time_90k integer not null check (start_time_90k > 0),
duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000),
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
check (composite_id >> 32 = camera_id)
);
create index recording_cover on recording (
camera_id,
start_time_90k,
duration_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_bytes,
run_offset,
flags
);
create table recording_playback (
composite_id integer primary key references recording (composite_id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
);
"#)?;
let camera_state = fill_recording(tx).unwrap();
fill_camera(tx, camera_state).unwrap();
tx.execute_batch(r#"
drop table old_camera;
drop table old_recording;
"#)?;
Ok(())
pub struct U;
pub fn new<'a>(_args: &'a super::Args) -> Result<Box<super::Upgrader + 'a>, Error> {
Ok(Box::new(U))
}
impl super::Upgrader for U {
fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> {
// These create statements match the schema.sql when version 1 was the latest.
tx.execute_batch(r#"
alter table camera rename to old_camera;
create table camera (
id integer primary key,
uuid blob unique,
short_name text not null,
description text,
host text,
username text,
password text,
main_rtsp_path text,
sub_rtsp_path text,
retain_bytes integer not null check (retain_bytes >= 0),
next_recording_id integer not null check (next_recording_id >= 0)
);
alter table recording rename to old_recording;
drop index recording_cover;
create table recording (
composite_id integer primary key,
camera_id integer not null references camera (id),
run_offset integer not null,
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
start_time_90k integer not null check (start_time_90k > 0),
duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000),
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
check (composite_id >> 32 = camera_id)
);
create index recording_cover on recording (
camera_id,
start_time_90k,
duration_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_bytes,
run_offset,
flags
);
create table recording_playback (
composite_id integer primary key references recording (composite_id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
);
"#)?;
let camera_state = fill_recording(tx).unwrap();
fill_camera(tx, camera_state).unwrap();
tx.execute_batch(r#"
drop table old_camera;
drop table old_recording;
"#)?;
Ok(())
}
}
struct CameraState {

View File

@@ -31,166 +31,250 @@
/// Upgrades a version 1 schema to a version 2 schema.
use error::Error;
use std::fs;
use rusqlite;
use schema::DirMeta;
use uuid::Uuid;
pub fn run(tx: &rusqlite::Transaction, args: &super::Args) -> Result<(), Error> {
// These create statements match the schema.sql when version 2 was the latest.
tx.execute_batch(r#"
create table sample_file_dir (
id integer primary key,
path text unique not null,
uuid blob unique not null check (length(uuid) = 16)
);
"#)?;
{
let mut stmt = tx.prepare_cached(r#"
insert into sample_file_dir (path, uuid)
values (:path, :uuid)
pub struct U<'a> {
sample_file_path: &'a str,
dir_meta: Option<DirMeta>,
}
pub fn new<'a>(args: &'a super::Args) -> Result<Box<super::Upgrader + 'a>, Error> {
let sample_file_path =
args.flag_sample_file_dir
.as_ref()
.ok_or_else(|| Error::new("--sample-file-dir required when upgrading from \
schema version 1 to 2.".to_owned()))?;
Ok(Box::new(U { sample_file_path, dir_meta: None }))
}
impl<'a> U<'a> {
/// Ensures there are sample files in the directory for all listed recordings.
/// Among other problems, this catches a fat-fingered `--sample-file-dir`.
fn verify_sample_files(&self, tx: &rusqlite::Transaction) -> Result<(), Error> {
// Build a hash of the uuids found in sample_file_path. Ignore other files.
let n: i64 = tx.query_row("select count(*) from recording", &[], |r| r.get_checked(0))??;
let mut files = ::fnv::FnvHashSet::with_capacity_and_hasher(n as usize, Default::default());
for e in fs::read_dir(self.sample_file_path)? {
let e = e?;
let f = e.file_name();
let s = match f.to_str() {
Some(s) => s,
None => continue,
};
let uuid = match Uuid::parse_str(s) {
Ok(u) => u,
Err(_) => continue,
};
if s != uuid.hyphenated().to_string() { // non-canonical form.
continue;
}
files.insert(uuid);
}
// Iterate through the database and check that everything has a matching file.
let mut stmt = tx.prepare(r"select sample_file_uuid from recording_playback")?;
let mut rows = stmt.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let uuid: ::db::FromSqlUuid = row.get_checked(0)?;
if !files.contains(&uuid.0) {
return Err(Error::new(format!("{} is missing from dir {}!",
uuid.0, self.sample_file_path)));
}
}
Ok(())
}
}
impl<'a> super::Upgrader for U<'a> {
fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> {
self.verify_sample_files(tx)?;
// These create statements match the schema.sql when version 2 was the latest.
tx.execute_batch(r#"
create table meta (
uuid blob not null check (length(uuid) = 16)
);
create table open (
id integer primary key,
uuid blob unique not null check (length(uuid) = 16)
);
create table sample_file_dir (
id integer primary key,
path text unique not null,
uuid blob unique not null check (length(uuid) = 16),
last_complete_open_id integer references open (id)
);
"#)?;
let uuid = ::uuid::Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
let path = args.flag_sample_file_dir
.as_ref()
.ok_or_else(|| Error::new("--sample-file-dir required when upgrading from
schema version 1 to 2.".to_owned()))?;
stmt.execute_named(&[
(":path", &path.as_str()),
(":uuid", &uuid_bytes),
])?;
let db_uuid = ::uuid::Uuid::new_v4();
let db_uuid_bytes = &db_uuid.as_bytes()[..];
tx.execute("insert into meta (uuid) values (?)", &[&db_uuid_bytes])?;
let open_uuid = ::uuid::Uuid::new_v4();
let open_uuid_bytes = &open_uuid.as_bytes()[..];
tx.execute("insert into open (uuid) values (?)", &[&open_uuid_bytes])?;
let open_id = tx.last_insert_rowid() as u32;
let dir_uuid = ::uuid::Uuid::new_v4();
let dir_uuid_bytes = &dir_uuid.as_bytes()[..];
let mut meta = ::schema::DirMeta::default();
{
meta.db_uuid.extend_from_slice(db_uuid_bytes);
meta.dir_uuid.extend_from_slice(dir_uuid_bytes);
let open = meta.mut_in_progress_open();
open.id = open_id;
open.uuid.extend_from_slice(&open_uuid_bytes);
}
tx.execute(r#"
insert into sample_file_dir (path, uuid, last_complete_open_id)
values (?, ?, ?)
"#, &[&self.sample_file_path, &dir_uuid_bytes, &open_id])?;
self.dir_meta = Some(meta);
tx.execute_batch(r#"
alter table camera rename to old_camera;
alter table recording rename to old_recording;
alter table video_sample_entry rename to old_video_sample_entry;
drop index recording_cover;
create table camera (
id integer primary key,
uuid blob unique not null check (length(uuid) = 16),
short_name text not null,
description text,
host text,
username text,
password text
);
create table stream (
id integer primary key,
camera_id integer not null references camera (id),
sample_file_dir_id integer references sample_file_dir (id),
type text not null check (type in ('main', 'sub')),
record integer not null check (record in (1, 0)),
rtsp_path text not null,
retain_bytes integer not null check (retain_bytes >= 0),
next_recording_id integer not null check (next_recording_id >= 0),
unique (camera_id, type)
);
create table recording (
composite_id integer primary key,
stream_id integer not null references stream (id),
run_offset integer not null,
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
start_time_90k integer not null check (start_time_90k > 0),
duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000),
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_sync_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
check (composite_id >> 32 = stream_id)
);
create index recording_cover on recording (
stream_id,
start_time_90k,
duration_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_bytes,
run_offset,
flags
);
create table video_sample_entry (
id integer primary key,
sha1 blob unique not null check (length(sha1) = 20),
width integer not null check (width > 0),
height integer not null check (height > 0),
rfc6381_codec text not null,
data blob not null check (length(data) > 86)
);
insert into camera
select
id,
uuid,
short_name,
description,
host,
username,
password
from old_camera;
-- Insert main streams using the same id as the camera, to ease changing recordings.
insert into stream
select
old_camera.id,
old_camera.id,
sample_file_dir.id,
'main',
1,
old_camera.main_rtsp_path,
old_camera.retain_bytes,
old_camera.next_recording_id
from
old_camera cross join sample_file_dir;
-- Insert sub stream (if path is non-empty) using any id.
insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path,
retain_bytes, next_recording_id)
select
old_camera.id,
sample_file_dir.id,
'sub',
0,
old_camera.sub_rtsp_path,
0,
0
from
old_camera cross join sample_file_dir
where
old_camera.sub_rtsp_path != '';
insert into recording
select
composite_id,
camera_id,
run_offset,
flags,
sample_file_bytes,
start_time_90k,
duration_90k,
local_time_delta_90k,
video_samples,
video_sync_samples,
video_sample_entry_id
from
old_recording;
"#)?;
fix_video_sample_entry(tx)?;
tx.execute_batch(r#"
drop table old_camera;
drop table old_recording;
drop table old_video_sample_entry;
"#)?;
Ok(())
}
tx.execute_batch(r#"
alter table camera rename to old_camera;
alter table recording rename to old_recording;
alter table video_sample_entry rename to old_video_sample_entry;
drop index recording_cover;
create table camera (
id integer primary key,
uuid blob unique not null check (length(uuid) = 16),
short_name text not null,
description text,
host text,
username text,
password text
);
create table stream (
id integer primary key,
camera_id integer not null references camera (id),
sample_file_dir_id integer references sample_file_dir (id),
type text not null check (type in ('main', 'sub')),
record integer not null check (record in (1, 0)),
rtsp_path text not null,
retain_bytes integer not null check (retain_bytes >= 0),
next_recording_id integer not null check (next_recording_id >= 0),
unique (camera_id, type)
);
create table recording (
composite_id integer primary key,
stream_id integer not null references stream (id),
run_offset integer not null,
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
start_time_90k integer not null check (start_time_90k > 0),
duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000),
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_sync_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
check (composite_id >> 32 = stream_id)
);
create index recording_cover on recording (
stream_id,
start_time_90k,
duration_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_bytes,
run_offset,
flags
);
create table video_sample_entry (
id integer primary key,
sha1 blob unique not null check (length(sha1) = 20),
width integer not null check (width > 0),
height integer not null check (height > 0),
rfc6381_codec text not null,
data blob not null check (length(data) > 86)
);
insert into camera
select
id,
uuid,
short_name,
description,
host,
username,
password
from old_camera;
-- Insert main streams using the same id as the camera, to ease changing recordings.
insert into stream
select
old_camera.id,
old_camera.id,
sample_file_dir.id,
'main',
1,
old_camera.main_rtsp_path,
old_camera.retain_bytes,
old_camera.next_recording_id
from
old_camera cross join sample_file_dir;
-- Insert sub stream (if path is non-empty) using any id.
insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path, retain_bytes,
next_recording_id)
select
old_camera.id,
sample_file_dir.id,
'sub',
0,
old_camera.sub_rtsp_path,
0,
0
from
old_camera cross join sample_file_dir
where
old_camera.sub_rtsp_path != '';
insert into recording
select
composite_id,
camera_id,
run_offset,
flags,
sample_file_bytes,
start_time_90k,
duration_90k,
local_time_delta_90k,
video_samples,
video_sync_samples,
video_sample_entry_id
from
old_recording;
"#)?;
fix_video_sample_entry(tx)?;
tx.execute_batch(r#"
drop table old_camera;
drop table old_recording;
drop table old_video_sample_entry;
"#)?;
Ok(())
fn post_tx(&mut self) -> Result<(), Error> {
let mut meta = self.dir_meta.take().unwrap();
let d = ::dir::SampleFileDir::create(self.sample_file_path, &meta)?;
::std::mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open);
d.write_meta(&meta)?;
Ok(())
}
}
fn fix_video_sample_entry(tx: &rusqlite::Transaction) -> Result<(), Error> {