diff --git a/Cargo.lock b/Cargo.lock index 17d13be..2b77ad0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -481,6 +481,7 @@ dependencies = [ "mylog 0.1.0 (git+https://github.com/scottlamb/mylog)", "openssl 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "reffers 0.4.2 (git+https://github.com/diwic/reffers-rs)", "regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -726,6 +727,11 @@ name = "pkg-config" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "protobuf" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quote" version = "0.3.15" @@ -1297,6 +1303,7 @@ dependencies = [ "checksum phf_generator 0.7.21 (registry+https://github.com/rust-lang/crates.io-index)" = "6b07ffcc532ccc85e3afc45865469bf5d9e4ef5bfcf9622e3cfe80c2d275ec03" "checksum phf_shared 0.7.21 (registry+https://github.com/rust-lang/crates.io-index)" = "07e24b0ca9643bdecd0632f2b3da6b1b89bbb0030e0b992afc1113b23a7bc2f2" "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" +"checksum protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bec26e67194b7d991908145fdf21b7cae8b08423d96dcb9e860cd31f854b9506" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "512870020642bb8c221bf68baa1b2573da814f6ccfe5c9699b1c303047abe9b1" "checksum rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eba5f8cb59cc50ed56be8880a5c7b496bfd9bd26394e176bc67884094145c2c5" diff --git a/Cargo.toml b/Cargo.toml index 1c90ee1..73e7aa9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ moonfire-ffmpeg = { path = "ffmpeg" } mylog = { git = "https://github.com/scottlamb/mylog" } openssl = "0.10" parking_lot = { version = "0.5", features = [] } +protobuf = "1.4" reffers = { git = "https://github.com/diwic/reffers-rs" } regex = "0.2" rusqlite = "0.13" diff --git a/guide/schema.md b/guide/schema.md index bd2a748..d56520e 100644 --- a/guide/schema.md +++ b/guide/schema.md @@ -199,6 +199,8 @@ Version 2 adds: * recording of sub streams (splits a new `stream` table out of `camera`) * support for multiple sample file directories, to take advantage of multiple hard drives (or multiple RAID volumes). +* interlock between database and sample file directories to avoid various + mixups that could cause data integrity problems. * records the RFC-6381 codec associated with a video sample entry, so that logic for determining this is no longer needed as part of the database layer. diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index 2a4e3b0..6d99e68 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -207,9 +207,11 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc, id: i32, to_delet fn lower_retention(db: &Arc, zero_limits: BTreeMap>) -> Result<(), Error> { + let dirs_to_open: Vec<_> = zero_limits.keys().map(|id| *id).collect(); + db.lock().open_sample_file_dirs(&dirs_to_open[..])?; for (dir_id, l) in &zero_limits { - let dir = db.lock().sample_file_dirs_by_id().get(dir_id).unwrap().open()?; - dir::lower_retention(dir, db.clone(), &l)?; + let dir = db.lock().sample_file_dirs_by_id().get(dir_id).unwrap().get()?; + dir::lower_retention(dir.clone(), db.clone(), &l)?; } Ok(()) } diff --git a/src/cmds/config/dirs.rs b/src/cmds/config/dirs.rs index 4dc3a4a..c4289cc 100644 --- a/src/cmds/config/dirs.rs +++ b/src/cmds/config/dirs.rs @@ -145,8 +145,9 @@ fn actually_delete(model: &RefCell, siv: &mut Cursive) { siv.pop_layer(); // deletion confirmation siv.pop_layer(); // retention dialog let dir = { - let l = model.db.lock(); - l.sample_file_dirs_by_id().get(&model.dir_id).unwrap().open().unwrap() + let mut l = model.db.lock(); + l.open_sample_file_dirs(&[model.dir_id]).unwrap(); // TODO: don't unwrap. + l.sample_file_dirs_by_id().get(&model.dir_id).unwrap().get().unwrap() }; if let Err(e) = dir::lower_retention(dir, model.db.clone(), &new_limits[..]) { siv.add_layer(views::Dialog::text(format!("Unable to delete excess video: {}", e)) @@ -281,7 +282,7 @@ fn edit_dir_dialog(db: &Arc, siv: &mut Cursive, dir_id: i32) { let mut total_retain = 0; let fs_capacity; { - let l = db.lock(); + let mut l = db.lock(); for (&id, s) in l.streams_by_id() { let c = l.cameras_by_id().get(&s.camera_id).expect("stream without camera"); if s.sample_file_dir_id != Some(dir_id) { @@ -299,10 +300,9 @@ fn edit_dir_dialog(db: &Arc, siv: &mut Cursive, dir_id: i32) { if streams.is_empty() { return delete_dir_dialog(db, siv, dir_id); } + l.open_sample_file_dirs(&[dir_id]).unwrap(); // TODO: don't unwrap. let dir = l.sample_file_dirs_by_id().get(&dir_id).unwrap(); - - // TODO: go another way if open fails. - let stat = dir.open().unwrap().statfs().unwrap(); + let stat = dir.get().unwrap().statfs().unwrap(); fs_capacity = stat.f_bsize as i64 * stat.f_bavail as i64 + total_used; path = dir.path.clone(); } diff --git a/src/cmds/config/mod.rs b/src/cmds/config/mod.rs index 11f6100..57be0eb 100644 --- a/src/cmds/config/mod.rs +++ b/src/cmds/config/mod.rs @@ -124,7 +124,7 @@ struct Args { pub fn run() -> Result<(), Error> { let args: Args = super::parse_args(USAGE)?; let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?; - let db = Arc::new(db::Database::new(conn)?); + let db = Arc::new(db::Database::new(conn, true)?); let mut siv = Cursive::new(); //siv.add_global_callback('q', |s| s.quit()); diff --git a/src/cmds/init.rs b/src/cmds/init.rs index d7ad3fd..13acd02 100644 --- a/src/cmds/init.rs +++ b/src/cmds/init.rs @@ -66,9 +66,7 @@ pub fn run() -> Result<(), Error> { pragma journal_mode = wal; pragma page_size = 16384; "#)?; - let tx = conn.transaction()?; - tx.execute_batch(include_str!("../schema.sql"))?; - tx.commit()?; + db::Database::init(&mut conn)?; info!("Database initialized."); Ok(()) } diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 8444af6..4c92918 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -100,9 +100,17 @@ pub fn run() -> Result<(), Error> { let (_db_dir, conn) = super::open_conn( &args.flag_db_dir, if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?; - let db = Arc::new(db::Database::new(conn).unwrap()); + let db = Arc::new(db::Database::new(conn, !args.flag_read_only).unwrap()); info!("Database is loaded."); + { + let mut l = db.lock(); + let dirs_to_open: Vec<_> = + l.streams_by_id().values().filter_map(|s| s.sample_file_dir_id).collect(); + l.open_sample_file_dirs(&dirs_to_open)?; + } + info!("Directories are opened."); + let s = web::Service::new(db.clone(), Some(&args.flag_ui_dir), resolve_zone())?; // Start a streamer for each stream. @@ -120,13 +128,13 @@ pub fn run() -> Result<(), Error> { shutdown: &shutdown_streamers, }; - // Create directories for streams that need them. + // Get the directories that need syncers. for stream in l.streams_by_id().values() { if let (Some(id), true) = (stream.sample_file_dir_id, stream.record) { dirs.entry(id).or_insert_with(|| { let d = l.sample_file_dirs_by_id().get(&id).unwrap(); info!("Starting syncer for path {}", d.path); - d.open() + d.get().unwrap() }); } } @@ -135,7 +143,6 @@ pub fn run() -> Result<(), Error> { drop(l); let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default()); for (id, dir) in dirs.drain() { - let dir = dir?; let (channel, join) = dir::start_syncer(dir.clone(), db.clone())?; syncers.insert(id, Syncer { dir, diff --git a/src/cmds/upgrade/mod.rs b/src/cmds/upgrade/mod.rs index f0bd95c..40a1504 100644 --- a/src/cmds/upgrade/mod.rs +++ b/src/cmds/upgrade/mod.rs @@ -64,10 +64,10 @@ Options: const UPGRADE_NOTES: &'static str = concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION")); -const UPGRADERS: [fn(&rusqlite::Transaction, &Args) -> Result<(), Error>; 2] = [ - v0_to_v1::run, - v1_to_v2::run, -]; +pub trait Upgrader { + fn in_tx(&mut self, &rusqlite::Transaction) -> Result<(), Error> { Ok(()) } + fn post_tx(&mut self) -> Result<(), Error> { Ok(()) } +} #[derive(Debug, Deserialize)] pub struct Args { @@ -89,8 +89,13 @@ pub fn run() -> Result<(), Error> { let args: Args = super::parse_args(USAGE)?; let (_db_dir, mut conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?; + let upgraders = [ + v0_to_v1::new, + v1_to_v2::new, + ]; + { - assert_eq!(UPGRADERS.len(), db::EXPECTED_VERSION as usize); + assert_eq!(upgraders.len(), db::EXPECTED_VERSION as usize); let old_ver = conn.query_row("select max(id) from version", &[], |row| row.get_checked(0))??; if old_ver > db::EXPECTED_VERSION { @@ -103,13 +108,15 @@ pub fn run() -> Result<(), Error> { set_journal_mode(&conn, &args.flag_preset_journal).unwrap(); for ver in old_ver .. db::EXPECTED_VERSION { info!("...from version {} to version {}", ver, ver + 1); + let mut u = upgraders[ver as usize](&args)?; let tx = conn.transaction()?; - UPGRADERS[ver as usize](&tx, &args)?; + u.in_tx(&tx)?; tx.execute(r#" insert into version (id, unix_time, notes) values (?, cast(strftime('%s', 'now') as int32), ?) "#, &[&(ver + 1), &UPGRADE_NOTES])?; tx.commit()?; + u.post_tx()?; } } diff --git a/src/cmds/upgrade/v0_to_v1.rs b/src/cmds/upgrade/v0_to_v1.rs index a32ce0a..21c83d4 100644 --- a/src/cmds/upgrade/v0_to_v1.rs +++ b/src/cmds/upgrade/v0_to_v1.rs @@ -37,65 +37,73 @@ use rusqlite; use std::collections::HashMap; use strutil; -pub fn run(tx: &rusqlite::Transaction, _args: &super::Args) -> Result<(), Error> { - // These create statements match the schema.sql when version 1 was the latest. - tx.execute_batch(r#" - alter table camera rename to old_camera; - create table camera ( - id integer primary key, - uuid blob unique, - short_name text not null, - description text, - host text, - username text, - password text, - main_rtsp_path text, - sub_rtsp_path text, - retain_bytes integer not null check (retain_bytes >= 0), - next_recording_id integer not null check (next_recording_id >= 0) - ); - alter table recording rename to old_recording; - drop index recording_cover; - create table recording ( - composite_id integer primary key, - camera_id integer not null references camera (id), - run_offset integer not null, - flags integer not null, - sample_file_bytes integer not null check (sample_file_bytes > 0), - start_time_90k integer not null check (start_time_90k > 0), - duration_90k integer not null - check (duration_90k >= 0 and duration_90k < 5*60*90000), - local_time_delta_90k integer not null, - video_samples integer not null check (video_samples > 0), - video_sync_samples integer not null check (video_samples > 0), - video_sample_entry_id integer references video_sample_entry (id), - check (composite_id >> 32 = camera_id) - ); - create index recording_cover on recording ( - camera_id, - start_time_90k, - duration_90k, - video_samples, - video_sync_samples, - video_sample_entry_id, - sample_file_bytes, - run_offset, - flags - ); - create table recording_playback ( - composite_id integer primary key references recording (composite_id), - sample_file_uuid blob not null check (length(sample_file_uuid) = 16), - sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), - video_index blob not null check (length(video_index) > 0) - ); - "#)?; - let camera_state = fill_recording(tx).unwrap(); - fill_camera(tx, camera_state).unwrap(); - tx.execute_batch(r#" - drop table old_camera; - drop table old_recording; - "#)?; - Ok(()) +pub struct U; + +pub fn new<'a>(_args: &'a super::Args) -> Result, Error> { + Ok(Box::new(U)) +} + +impl super::Upgrader for U { + fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> { + // These create statements match the schema.sql when version 1 was the latest. + tx.execute_batch(r#" + alter table camera rename to old_camera; + create table camera ( + id integer primary key, + uuid blob unique, + short_name text not null, + description text, + host text, + username text, + password text, + main_rtsp_path text, + sub_rtsp_path text, + retain_bytes integer not null check (retain_bytes >= 0), + next_recording_id integer not null check (next_recording_id >= 0) + ); + alter table recording rename to old_recording; + drop index recording_cover; + create table recording ( + composite_id integer primary key, + camera_id integer not null references camera (id), + run_offset integer not null, + flags integer not null, + sample_file_bytes integer not null check (sample_file_bytes > 0), + start_time_90k integer not null check (start_time_90k > 0), + duration_90k integer not null + check (duration_90k >= 0 and duration_90k < 5*60*90000), + local_time_delta_90k integer not null, + video_samples integer not null check (video_samples > 0), + video_sync_samples integer not null check (video_samples > 0), + video_sample_entry_id integer references video_sample_entry (id), + check (composite_id >> 32 = camera_id) + ); + create index recording_cover on recording ( + camera_id, + start_time_90k, + duration_90k, + video_samples, + video_sync_samples, + video_sample_entry_id, + sample_file_bytes, + run_offset, + flags + ); + create table recording_playback ( + composite_id integer primary key references recording (composite_id), + sample_file_uuid blob not null check (length(sample_file_uuid) = 16), + sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), + video_index blob not null check (length(video_index) > 0) + ); + "#)?; + let camera_state = fill_recording(tx).unwrap(); + fill_camera(tx, camera_state).unwrap(); + tx.execute_batch(r#" + drop table old_camera; + drop table old_recording; + "#)?; + Ok(()) + } } struct CameraState { diff --git a/src/cmds/upgrade/v1_to_v2.rs b/src/cmds/upgrade/v1_to_v2.rs index 27813dc..af2cef0 100644 --- a/src/cmds/upgrade/v1_to_v2.rs +++ b/src/cmds/upgrade/v1_to_v2.rs @@ -31,166 +31,250 @@ /// Upgrades a version 1 schema to a version 2 schema. use error::Error; +use std::fs; use rusqlite; +use schema::DirMeta; +use uuid::Uuid; -pub fn run(tx: &rusqlite::Transaction, args: &super::Args) -> Result<(), Error> { - // These create statements match the schema.sql when version 2 was the latest. - tx.execute_batch(r#" - create table sample_file_dir ( - id integer primary key, - path text unique not null, - uuid blob unique not null check (length(uuid) = 16) - ); - "#)?; - { - let mut stmt = tx.prepare_cached(r#" - insert into sample_file_dir (path, uuid) - values (:path, :uuid) +pub struct U<'a> { + sample_file_path: &'a str, + dir_meta: Option, +} + +pub fn new<'a>(args: &'a super::Args) -> Result, Error> { + let sample_file_path = + args.flag_sample_file_dir + .as_ref() + .ok_or_else(|| Error::new("--sample-file-dir required when upgrading from \ + schema version 1 to 2.".to_owned()))?; + Ok(Box::new(U { sample_file_path, dir_meta: None })) +} + +impl<'a> U<'a> { + /// Ensures there are sample files in the directory for all listed recordings. + /// Among other problems, this catches a fat-fingered `--sample-file-dir`. + fn verify_sample_files(&self, tx: &rusqlite::Transaction) -> Result<(), Error> { + // Build a hash of the uuids found in sample_file_path. Ignore other files. + let n: i64 = tx.query_row("select count(*) from recording", &[], |r| r.get_checked(0))??; + let mut files = ::fnv::FnvHashSet::with_capacity_and_hasher(n as usize, Default::default()); + for e in fs::read_dir(self.sample_file_path)? { + let e = e?; + let f = e.file_name(); + let s = match f.to_str() { + Some(s) => s, + None => continue, + }; + let uuid = match Uuid::parse_str(s) { + Ok(u) => u, + Err(_) => continue, + }; + if s != uuid.hyphenated().to_string() { // non-canonical form. + continue; + } + files.insert(uuid); + } + + // Iterate through the database and check that everything has a matching file. + let mut stmt = tx.prepare(r"select sample_file_uuid from recording_playback")?; + let mut rows = stmt.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let uuid: ::db::FromSqlUuid = row.get_checked(0)?; + if !files.contains(&uuid.0) { + return Err(Error::new(format!("{} is missing from dir {}!", + uuid.0, self.sample_file_path))); + } + } + Ok(()) + } +} + +impl<'a> super::Upgrader for U<'a> { + fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> { + self.verify_sample_files(tx)?; + + // These create statements match the schema.sql when version 2 was the latest. + tx.execute_batch(r#" + create table meta ( + uuid blob not null check (length(uuid) = 16) + ); + create table open ( + id integer primary key, + uuid blob unique not null check (length(uuid) = 16) + ); + create table sample_file_dir ( + id integer primary key, + path text unique not null, + uuid blob unique not null check (length(uuid) = 16), + last_complete_open_id integer references open (id) + ); "#)?; - let uuid = ::uuid::Uuid::new_v4(); - let uuid_bytes = &uuid.as_bytes()[..]; - let path = args.flag_sample_file_dir - .as_ref() - .ok_or_else(|| Error::new("--sample-file-dir required when upgrading from - schema version 1 to 2.".to_owned()))?; - stmt.execute_named(&[ - (":path", &path.as_str()), - (":uuid", &uuid_bytes), - ])?; + let db_uuid = ::uuid::Uuid::new_v4(); + let db_uuid_bytes = &db_uuid.as_bytes()[..]; + tx.execute("insert into meta (uuid) values (?)", &[&db_uuid_bytes])?; + let open_uuid = ::uuid::Uuid::new_v4(); + let open_uuid_bytes = &open_uuid.as_bytes()[..]; + tx.execute("insert into open (uuid) values (?)", &[&open_uuid_bytes])?; + let open_id = tx.last_insert_rowid() as u32; + let dir_uuid = ::uuid::Uuid::new_v4(); + let dir_uuid_bytes = &dir_uuid.as_bytes()[..]; + + let mut meta = ::schema::DirMeta::default(); + { + meta.db_uuid.extend_from_slice(db_uuid_bytes); + meta.dir_uuid.extend_from_slice(dir_uuid_bytes); + let open = meta.mut_in_progress_open(); + open.id = open_id; + open.uuid.extend_from_slice(&open_uuid_bytes); + } + + tx.execute(r#" + insert into sample_file_dir (path, uuid, last_complete_open_id) + values (?, ?, ?) + "#, &[&self.sample_file_path, &dir_uuid_bytes, &open_id])?; + self.dir_meta = Some(meta); + + tx.execute_batch(r#" + alter table camera rename to old_camera; + alter table recording rename to old_recording; + alter table video_sample_entry rename to old_video_sample_entry; + drop index recording_cover; + + create table camera ( + id integer primary key, + uuid blob unique not null check (length(uuid) = 16), + short_name text not null, + description text, + host text, + username text, + password text + ); + + create table stream ( + id integer primary key, + camera_id integer not null references camera (id), + sample_file_dir_id integer references sample_file_dir (id), + type text not null check (type in ('main', 'sub')), + record integer not null check (record in (1, 0)), + rtsp_path text not null, + retain_bytes integer not null check (retain_bytes >= 0), + next_recording_id integer not null check (next_recording_id >= 0), + unique (camera_id, type) + ); + + create table recording ( + composite_id integer primary key, + stream_id integer not null references stream (id), + run_offset integer not null, + flags integer not null, + sample_file_bytes integer not null check (sample_file_bytes > 0), + start_time_90k integer not null check (start_time_90k > 0), + duration_90k integer not null + check (duration_90k >= 0 and duration_90k < 5*60*90000), + local_time_delta_90k integer not null, + video_samples integer not null check (video_samples > 0), + video_sync_samples integer not null check (video_sync_samples > 0), + video_sample_entry_id integer references video_sample_entry (id), + check (composite_id >> 32 = stream_id) + ); + + create index recording_cover on recording ( + stream_id, + start_time_90k, + duration_90k, + video_samples, + video_sync_samples, + video_sample_entry_id, + sample_file_bytes, + run_offset, + flags + ); + + create table video_sample_entry ( + id integer primary key, + sha1 blob unique not null check (length(sha1) = 20), + width integer not null check (width > 0), + height integer not null check (height > 0), + rfc6381_codec text not null, + data blob not null check (length(data) > 86) + ); + + insert into camera + select + id, + uuid, + short_name, + description, + host, + username, + password + from old_camera; + + -- Insert main streams using the same id as the camera, to ease changing recordings. + insert into stream + select + old_camera.id, + old_camera.id, + sample_file_dir.id, + 'main', + 1, + old_camera.main_rtsp_path, + old_camera.retain_bytes, + old_camera.next_recording_id + from + old_camera cross join sample_file_dir; + + -- Insert sub stream (if path is non-empty) using any id. + insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path, + retain_bytes, next_recording_id) + select + old_camera.id, + sample_file_dir.id, + 'sub', + 0, + old_camera.sub_rtsp_path, + 0, + 0 + from + old_camera cross join sample_file_dir + where + old_camera.sub_rtsp_path != ''; + + insert into recording + select + composite_id, + camera_id, + run_offset, + flags, + sample_file_bytes, + start_time_90k, + duration_90k, + local_time_delta_90k, + video_samples, + video_sync_samples, + video_sample_entry_id + from + old_recording; + "#)?; + + fix_video_sample_entry(tx)?; + + tx.execute_batch(r#" + drop table old_camera; + drop table old_recording; + drop table old_video_sample_entry; + "#)?; + + Ok(()) } - tx.execute_batch(r#" - alter table camera rename to old_camera; - alter table recording rename to old_recording; - alter table video_sample_entry rename to old_video_sample_entry; - drop index recording_cover; - - create table camera ( - id integer primary key, - uuid blob unique not null check (length(uuid) = 16), - short_name text not null, - description text, - host text, - username text, - password text - ); - - create table stream ( - id integer primary key, - camera_id integer not null references camera (id), - sample_file_dir_id integer references sample_file_dir (id), - type text not null check (type in ('main', 'sub')), - record integer not null check (record in (1, 0)), - rtsp_path text not null, - retain_bytes integer not null check (retain_bytes >= 0), - next_recording_id integer not null check (next_recording_id >= 0), - unique (camera_id, type) - ); - - create table recording ( - composite_id integer primary key, - stream_id integer not null references stream (id), - run_offset integer not null, - flags integer not null, - sample_file_bytes integer not null check (sample_file_bytes > 0), - start_time_90k integer not null check (start_time_90k > 0), - duration_90k integer not null - check (duration_90k >= 0 and duration_90k < 5*60*90000), - local_time_delta_90k integer not null, - video_samples integer not null check (video_samples > 0), - video_sync_samples integer not null check (video_sync_samples > 0), - video_sample_entry_id integer references video_sample_entry (id), - check (composite_id >> 32 = stream_id) - ); - - create index recording_cover on recording ( - stream_id, - start_time_90k, - duration_90k, - video_samples, - video_sync_samples, - video_sample_entry_id, - sample_file_bytes, - run_offset, - flags - ); - - create table video_sample_entry ( - id integer primary key, - sha1 blob unique not null check (length(sha1) = 20), - width integer not null check (width > 0), - height integer not null check (height > 0), - rfc6381_codec text not null, - data blob not null check (length(data) > 86) - ); - - insert into camera - select - id, - uuid, - short_name, - description, - host, - username, - password - from old_camera; - - -- Insert main streams using the same id as the camera, to ease changing recordings. - insert into stream - select - old_camera.id, - old_camera.id, - sample_file_dir.id, - 'main', - 1, - old_camera.main_rtsp_path, - old_camera.retain_bytes, - old_camera.next_recording_id - from - old_camera cross join sample_file_dir; - - -- Insert sub stream (if path is non-empty) using any id. - insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path, retain_bytes, - next_recording_id) - select - old_camera.id, - sample_file_dir.id, - 'sub', - 0, - old_camera.sub_rtsp_path, - 0, - 0 - from - old_camera cross join sample_file_dir - where - old_camera.sub_rtsp_path != ''; - - insert into recording - select - composite_id, - camera_id, - run_offset, - flags, - sample_file_bytes, - start_time_90k, - duration_90k, - local_time_delta_90k, - video_samples, - video_sync_samples, - video_sample_entry_id - from - old_recording; - "#)?; - - fix_video_sample_entry(tx)?; - - tx.execute_batch(r#" - drop table old_camera; - drop table old_recording; - drop table old_video_sample_entry; - "#)?; - - Ok(()) + fn post_tx(&mut self) -> Result<(), Error> { + let mut meta = self.dir_meta.take().unwrap(); + let d = ::dir::SampleFileDir::create(self.sample_file_path, &meta)?; + ::std::mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open); + d.write_meta(&meta)?; + Ok(()) + } } fn fix_video_sample_entry(tx: &rusqlite::Transaction) -> Result<(), Error> { diff --git a/src/db.rs b/src/db.rs index c73530c..2cffb70 100644 --- a/src/db.rs +++ b/src/db.rs @@ -53,12 +53,13 @@ use dir; use error::{Error, ResultExt}; -use fnv; +use fnv::{self, FnvHashMap}; use lru_cache::LruCache; use openssl::hash; use parking_lot::{Mutex,MutexGuard}; use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite; +use schema; use std::collections::BTreeMap; use std::collections::btree_map; use std::cell::RefCell; @@ -194,7 +195,7 @@ const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" recording.composite_id "#; -struct FromSqlUuid(Uuid); +pub struct FromSqlUuid(pub Uuid); impl rusqlite::types::FromSql for FromSqlUuid { fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult { @@ -358,18 +359,19 @@ pub struct SampleFileDir { pub id: i32, pub path: String, pub uuid: Uuid, - dir: RefCell>>, + dir: Option>, + last_complete_open: Option, } impl SampleFileDir { - pub fn open(&self) -> Result, Error> { - let mut d = self.dir.borrow_mut(); - if let Some(ref d) = *d { - return Ok(d.clone()); - } - let dir = dir::SampleFileDir::open(&self.path)?; - *d = Some(dir.clone()); - Ok(dir) + /// Returns a cloned copy of the directory, or Err if closed. + /// + /// Use `LockedDatabase::open_sample_file_dirs` prior to calling this method. + pub fn get(&self) -> Result, Error> { + Ok(self.dir + .as_ref() + .ok_or_else(|| Error::new(format!("sample file dir {} is closed", self.id)))? + .clone()) } } @@ -599,6 +601,10 @@ pub struct LockedDatabase { /// while its underlying `rusqlite::Transaction` is borrowing `conn`. #[derive(Debug)] struct State { + uuid: Uuid, + + /// If the database is open in read-write mode, the information about the current Open row. + open: Option, sample_file_dirs_by_id: BTreeMap, cameras_by_id: BTreeMap, streams_by_id: BTreeMap, @@ -608,11 +614,17 @@ struct State { playback_cache: RefCell, fnv::FnvBuildHasher>>, } +#[derive(Copy, Clone, Debug)] +struct Open { + id: u32, + uuid: Uuid, +} + /// A high-level transaction. This manages the SQLite transaction and the matching modification to /// be applied to the in-memory state on successful commit. pub struct Transaction<'a> { state: &'a mut State, - mods_by_stream: fnv::FnvHashMap, + mods_by_stream: FnvHashMap, tx: rusqlite::Transaction<'a>, /// True if due to an earlier error the transaction must be rolled back rather than committed. @@ -862,7 +874,7 @@ impl<'a> Transaction<'a> { } /// Looks up an existing entry in `mods` for a given stream or makes+inserts an identity entry. - fn get_mods_by_stream(mods: &mut fnv::FnvHashMap, stream_id: i32) + fn get_mods_by_stream(mods: &mut FnvHashMap, stream_id: i32) -> &mut StreamModification { mods.entry(stream_id).or_insert_with(StreamModification::default) } @@ -1047,6 +1059,79 @@ impl LockedDatabase { pub fn sample_file_dirs_by_id(&self) -> &BTreeMap { &self.state.sample_file_dirs_by_id } + + /// Opens the given sample file directories. + /// + /// `ids` is implicitly de-duplicated. + /// + /// When the database is in read-only mode, this simply opens all the directories after + /// locking and verifying their metadata matches the database state. In read-write mode, it + /// performs a single database transaction to update metadata for all dirs, then performs a like + /// update to the directories' on-disk metadata. + /// + /// Note this violates the principle of never accessing disk while holding the database lock. + /// Currently this only happens at startup (or during configuration), so this isn't a problem + /// in practice. + pub fn open_sample_file_dirs(&mut self, ids: &[i32]) -> Result<(), Error> { + let mut in_progress = FnvHashMap::with_capacity_and_hasher(ids.len(), Default::default()); + let o = self.state.open.as_ref(); + for &id in ids { + let e = in_progress.entry(id); + use ::std::collections::hash_map::Entry; + let e = match e { + Entry::Occupied(_) => continue, // suppress duplicate. + Entry::Vacant(e) => e, + }; + let dir = self.state + .sample_file_dirs_by_id + .get_mut(&id) + .ok_or_else(|| Error::new(format!("no such dir {}", id)))?; + if dir.dir.is_some() { continue } + let mut meta = schema::DirMeta::default(); + meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); + meta.dir_uuid.extend_from_slice(&dir.uuid.as_bytes()[..]); + if let Some(o) = o { + let open = meta.mut_in_progress_open(); + open.id = o.id; + open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]); + } + let d = dir::SampleFileDir::open(&dir.path, &meta)?; + if o.is_none() { // read-only mode; it's already fully opened. + dir.dir = Some(d); + } else { // read-write mode; there are more steps to do. + e.insert((meta, d)); + } + } + + let o = match o { + None => return Ok(()), // read-only mode; all done. + Some(o) => o, + }; + + let tx = self.conn.transaction()?; + { + let mut stmt = tx.prepare_cached(r#" + update sample_file_dir set last_complete_open_id = ? where id = ? + "#)?; + for &id in in_progress.keys() { + if stmt.execute(&[&o.id, &id])? != 1 { + return Err(Error::new(format!("unable to update dir {}", id))); + } + } + } + tx.commit()?; + + for (id, (mut meta, d)) in in_progress.drain() { + let dir = self.state.sample_file_dirs_by_id.get_mut(&id).unwrap(); + meta.last_complete_open.clear(); + mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open); + d.write_meta(&meta)?; + dir.dir = Some(d); + } + + Ok(()) + } + pub fn streams_by_id(&self) -> &BTreeMap { &self.state.streams_by_id } /// Returns an immutable view of the video sample entries. @@ -1061,7 +1146,7 @@ impl LockedDatabase { pub fn tx(&mut self) -> Result { Ok(Transaction{ state: &mut self.state, - mods_by_stream: fnv::FnvHashMap::default(), + mods_by_stream: FnvHashMap::default(), tx: self.conn.transaction()?, must_rollback: false, bypass_reservation_for_testing: false, @@ -1325,22 +1410,32 @@ impl LockedDatabase { info!("Loading sample file dirs"); let mut stmt = self.conn.prepare(r#" select - id, - path, - uuid + d.id, + d.path, + d.uuid, + d.last_complete_open_id, + o.uuid from - sample_file_dir; + sample_file_dir d left join open o on (d.last_complete_open_id = o.id); "#)?; let mut rows = stmt.query(&[])?; while let Some(row) = rows.next() { let row = row?; let id = row.get_checked(0)?; - let uuid: FromSqlUuid = row.get_checked(2)?; + let dir_uuid: FromSqlUuid = row.get_checked(2)?; + let open_id: Option = row.get_checked(3)?; + let open_uuid: Option = row.get_checked(4)?; + let last_complete_open = match (open_id, open_uuid) { + (Some(id), Some(uuid)) => Some(Open { id, uuid: uuid.0, }), + (None, None) => None, + _ => return Err(Error::new(format!("open table missing id {}", id))), + }; self.state.sample_file_dirs_by_id.insert(id, SampleFileDir { id, - uuid: uuid.0, + uuid: dir_uuid.0, path: row.get_checked(1)?, - dir: RefCell::new(None), + dir: None, + last_complete_open, }); } info!("Loaded {} sample file dirs", self.state.sample_file_dirs_by_id.len()); @@ -1478,27 +1573,45 @@ impl LockedDatabase { } pub fn add_sample_file_dir(&mut self, path: String) -> Result { - let dir = dir::SampleFileDir::create(&path)?; + let mut meta = schema::DirMeta::default(); let uuid = Uuid::new_v4(); let uuid_bytes = &uuid.as_bytes()[..]; - let tx = self.conn.transaction()?; + let o = self.state + .open + .as_ref() + .ok_or_else(|| Error::new("database is read-only".to_owned()))?; + + // Populate meta. { - let mut stmt = tx.prepare_cached(r#" - insert into sample_file_dir (path, uuid) values (:path, :uuid) - "#)?; - stmt.execute_named(&[ - (":uuid", &uuid_bytes), - (":path", &path), - ])?; + meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); + meta.dir_uuid.extend_from_slice(uuid_bytes); + let open = meta.mut_in_progress_open(); + open.id = o.id; + open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]); } - let id = tx.last_insert_rowid() as i32; - tx.commit()?; - self.state.sample_file_dirs_by_id.insert(id, SampleFileDir { - id, - path, - uuid, - dir: RefCell::new(Some(dir)), - }); + + let dir = dir::SampleFileDir::create(&path, &meta)?; + let uuid = Uuid::new_v4(); + self.conn.execute(r#" + insert into sample_file_dir (path, uuid, last_complete_open_id) + values (?, ?, ?) + "#, &[&path, &uuid_bytes, &o.id])?; + let id = self.conn.last_insert_rowid() as i32; + use ::std::collections::btree_map::Entry; + let e = self.state.sample_file_dirs_by_id.entry(id); + let d = match e { + Entry::Vacant(e) => e.insert(SampleFileDir { + id, + path, + uuid, + dir: Some(dir), + last_complete_open: None, + }), + Entry::Occupied(_) => Err(Error::new(format!("duplicate sample file dir id {}", id)))?, + }; + d.last_complete_open = Some(*o); + mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open); + d.dir.as_ref().unwrap().write_meta(&meta)?; Ok(id) } @@ -1508,16 +1621,11 @@ impl LockedDatabase { return Err(Error::new(format!("can't delete dir referenced by stream {}", id))); } } - let tx = self.conn.transaction()?; - { - let mut stmt = tx.prepare_cached(r#" - delete from sample_file_dir where id = ? - "#)?; - if stmt.execute(&[&dir_id])? != 1 { - return Err(Error::new(format!("no such dir {} to remove", dir_id))); - } + // TODO: remove/update metadata stored in the directory? at present this will have to + // be manually deleted before the dir can be reused. + if self.conn.execute("delete from sample_file_dir where id = ?", &[&dir_id])? != 1 { + return Err(Error::new(format!("no such dir {} to remove", dir_id))); } - tx.commit()?; self.state.sample_file_dirs_by_id.remove(&dir_id).expect("sample file dir should exist!"); Ok(()) } @@ -1664,29 +1772,8 @@ pub struct Database(Mutex); impl Database { /// Creates the database from a caller-supplied SQLite connection. - pub fn new(conn: rusqlite::Connection) -> Result { + pub fn new(conn: rusqlite::Connection, read_write: bool) -> Result { conn.execute("pragma foreign_keys = on", &[])?; - let list_recordings_by_time_sql = format!(r#" - select - recording.composite_id, - recording.run_offset, - recording.flags, - recording.start_time_90k, - recording.duration_90k, - recording.sample_file_bytes, - recording.video_samples, - recording.video_sync_samples, - recording.video_sample_entry_id - from - recording - where - stream_id = :stream_id and - recording.start_time_90k > :start_time_90k - {} and - recording.start_time_90k < :end_time_90k and - recording.start_time_90k + recording.duration_90k > :start_time_90k - order by - recording.start_time_90k - "#, recording::MAX_RECORDING_DURATION); { let ver = get_schema_version(&conn)?.ok_or_else(|| Error::new( "no such table: version. \ @@ -1709,9 +1796,49 @@ impl Database { } } + + // Note: the meta check comes after the version check to improve the error message when + // trying to open a version 0 or version 1 database (which lacked the meta table). + let uuid = conn.query_row("select uuid from meta", &[], |row| -> Result { + let uuid: FromSqlUuid = row.get_checked(0)?; + Ok(uuid.0) + })??; + let list_recordings_by_time_sql = format!(r#" + select + recording.composite_id, + recording.run_offset, + recording.flags, + recording.start_time_90k, + recording.duration_90k, + recording.sample_file_bytes, + recording.video_samples, + recording.video_sync_samples, + recording.video_sample_entry_id + from + recording + where + stream_id = :stream_id and + recording.start_time_90k > :start_time_90k - {} and + recording.start_time_90k < :end_time_90k and + recording.start_time_90k + recording.duration_90k > :start_time_90k + order by + recording.start_time_90k + "#, recording::MAX_RECORDING_DURATION); + let open = if read_write { + let mut stmt = conn.prepare(" insert into open (uuid) values (?)")?; + let uuid = Uuid::new_v4(); + let uuid_bytes = &uuid.as_bytes()[..]; + stmt.execute(&[&uuid_bytes])?; + Some(Open { + id: conn.last_insert_rowid() as u32, + uuid, + }) + } else { None }; let db = Database(Mutex::new(LockedDatabase{ conn: conn, state: State { + uuid, + open, sample_file_dirs_by_id: BTreeMap::new(), cameras_by_id: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(), @@ -1737,6 +1864,21 @@ impl Database { Ok(db) } + /// Initializes a database. + /// Note this doesn't set journal options, so that it can be used on in-memory databases for + /// test code. + pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> { + let tx = conn.transaction()?; + tx.execute_batch(include_str!("schema.sql"))?; + { + let uuid = ::uuid::Uuid::new_v4(); + let uuid_bytes = &uuid.as_bytes()[..]; + tx.execute("insert into meta (uuid) values (?)", &[&uuid_bytes])?; + } + tx.commit()?; + Ok(()) + } + /// Locks the database; the returned reference is the only way to perform (read or write) /// operations. pub fn lock(&self) -> MutexGuard { self.0.lock() } @@ -1765,9 +1907,8 @@ mod tests { use uuid::Uuid; fn setup_conn() -> Connection { - let conn = Connection::open_in_memory().unwrap(); - let schema = include_str!("schema.sql"); - conn.execute_batch(schema).unwrap(); + let mut conn = Connection::open_in_memory().unwrap(); + Database::init(&mut conn).unwrap(); conn } @@ -1925,10 +2066,10 @@ mod tests { } #[test] - fn test_no_version() { + fn test_no_meta_or_version() { testutil::init(); - let e = Database::new(Connection::open_in_memory().unwrap()).unwrap_err(); - assert!(e.description().starts_with("no such table: version")); + let e = Database::new(Connection::open_in_memory().unwrap(), false).unwrap_err(); + assert!(e.description().starts_with("no such table"), "{}", e); } #[test] @@ -1936,7 +2077,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap(); - let e = Database::new(c).unwrap_err(); + let e = Database::new(c, false).unwrap_err(); assert!(e.description().starts_with( "Database schema version 1 is too old (expected 2)"), "got: {:?}", e.description()); @@ -1947,7 +2088,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (3, 0, '');").unwrap(); - let e = Database::new(c).unwrap_err(); + let e = Database::new(c, false).unwrap_err(); assert!(e.description().starts_with( "Database schema version 3 is too new (expected 2)"), "got: {:?}", e.description()); } @@ -1957,7 +2098,7 @@ mod tests { fn test_fresh_db() { testutil::init(); let conn = setup_conn(); - let db = Database::new(conn).unwrap(); + let db = Database::new(conn, true).unwrap(); let db = db.lock(); assert_eq!(0, db.cameras_by_id().values().count()); } @@ -1967,7 +2108,7 @@ mod tests { fn test_full_lifecycle() { testutil::init(); let conn = setup_conn(); - let db = Database::new(conn).unwrap(); + let db = Database::new(conn, true).unwrap(); let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); let path = tmpdir.path().to_str().unwrap().to_owned(); let sample_file_dir_id = Some({ db.lock() }.add_sample_file_dir(path).unwrap()); @@ -1994,7 +2135,7 @@ mod tests { // Closing and reopening the database should present the same contents. let conn = db.close(); - let db = Database::new(conn).unwrap(); + let db = Database::new(conn, true).unwrap(); assert_no_recordings(&db, camera_uuid); assert_eq!(db.lock().list_reserved_sample_files().unwrap(), &[]); @@ -2048,7 +2189,7 @@ mod tests { // Queries on a fresh database should return the correct result (with caches populated from // existing database contents rather than built on insert). let conn = db.close(); - let db = Database::new(conn).unwrap(); + let db = Database::new(conn, true).unwrap(); assert_single_recording(&db, stream_id, &recording); // Deleting a recording should succeed, update the min/max times, and re-reserve the uuid. @@ -2070,7 +2211,7 @@ mod tests { fn test_drop_tx() { testutil::init(); let conn = setup_conn(); - let db = Database::new(conn).unwrap(); + let db = Database::new(conn, true).unwrap(); let mut db = db.lock(); { let mut tx = db.tx().unwrap(); diff --git a/src/dir.rs b/src/dir.rs index 54350db..cb4f1af 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -34,13 +34,15 @@ use db; use error::Error; -use libc; +use libc::{self, c_char}; +use protobuf::{self, Message}; use recording; use openssl::hash; +use schema; use std::cmp; use std::ffi; use std::fs; -use std::io::{self, Write}; +use std::io::{self, Read, Write}; use std::mem; use std::os::unix::io::FromRawFd; use std::sync::{Arc, Mutex}; @@ -95,6 +97,24 @@ impl Fd { Ok(Fd(fd)) } + /// Opens a sample file within this directory with the given flags and (if creating) mode. + unsafe fn openat(&self, p: *const c_char, flags: libc::c_int, mode: libc::c_int) + -> Result { + let fd = libc::openat(self.0, p, flags, mode); + if fd < 0 { + return Err(io::Error::last_os_error()) + } + Ok(fs::File::from_raw_fd(fd)) + } + + unsafe fn renameat(&self, from: *const c_char, to: *const c_char) -> Result<(), io::Error> { + let result = libc::renameat(self.0, from, self.0, to); + if result < 0 { + return Err(io::Error::last_os_error()) + } + Ok(()) + } + /// Locks the directory with the specified `flock` operation. pub fn lock(&self, operation: libc::c_int) -> Result<(), io::Error> { let ret = unsafe { libc::flock(self.0, operation) }; @@ -116,12 +136,60 @@ impl Fd { } impl SampleFileDir { - pub fn open(path: &str) -> Result, Error> { - SampleFileDir::open_self(path, false) + /// Opens the directory using the given metadata. + /// + /// `db_meta.in_progress_open` should be filled if the directory should be opened in read/write + /// mode; absent in read-only mode. + pub fn open(path: &str, db_meta: &schema::DirMeta) + -> Result, Error> { + let read_write = db_meta.in_progress_open.is_some(); + let s = SampleFileDir::open_self(path, false)?; + s.fd.lock(if read_write { libc::LOCK_EX } else { libc::LOCK_SH } | libc::LOCK_NB)?; + let dir_meta = s.read_meta()?; + if !SampleFileDir::consistent(db_meta, &dir_meta) { + return Err(Error::new(format!("metadata mismatch. db: {:?} dir: {:?}", + db_meta, &dir_meta))); + } + if db_meta.in_progress_open.is_some() { + s.write_meta(db_meta)?; + } + Ok(s) } - pub fn create(path: &str) -> Result, Error> { - SampleFileDir::open_self(path, true) + /// Returns true if the existing directory and database metadata are consistent; the directory + /// is then openable. + fn consistent(db_meta: &schema::DirMeta, dir_meta: &schema::DirMeta) -> bool { + if dir_meta.db_uuid != db_meta.db_uuid { return false; } + if dir_meta.dir_uuid != db_meta.dir_uuid { return false; } + + if db_meta.last_complete_open.is_some() && + (db_meta.last_complete_open != dir_meta.last_complete_open && + db_meta.last_complete_open != dir_meta.in_progress_open) { + return false; + } + + if db_meta.last_complete_open.is_none() && dir_meta.last_complete_open.is_some() { + return false; + } + + true + } + + pub fn create(path: &str, db_meta: &schema::DirMeta) -> Result, Error> { + let s = SampleFileDir::open_self(path, true)?; + s.fd.lock(libc::LOCK_EX | libc::LOCK_NB)?; + let old_meta = s.read_meta()?; + + // Verify metadata. We only care that it hasn't been completely opened. + // Partial opening by this or another database is fine; we won't overwrite anything. + // TODO: consider one exception: if the version 2 upgrade fails at the post_tx step. + if old_meta.last_complete_open.is_some() { + return Err(Error::new(format!("Can't create dir at path {}: is already in use:\n{:?}", + path, old_meta))); + } + + s.write_meta(db_meta)?; + Ok(s) } fn open_self(path: &str, create: bool) -> Result, Error> { @@ -137,7 +205,50 @@ impl SampleFileDir { /// Opens the given sample file for reading. pub fn open_sample_file(&self, uuid: Uuid) -> Result { - self.open_int(uuid, libc::O_RDONLY, 0) + let p = SampleFileDir::get_rel_pathname(uuid); + unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } + } + + /// Reads the directory metadata. If none is found, returns an empty proto. + fn read_meta(&self) -> Result { + let mut meta = schema::DirMeta::default(); + let p = unsafe { ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char) }; + let mut f = match unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } { + Err(e) => { + if e.kind() == ::std::io::ErrorKind::NotFound { + return Ok(meta); + } + return Err(e.into()); + }, + Ok(f) => f, + }; + let mut data = Vec::new(); + f.read_to_end(&mut data)?; + let mut s = protobuf::CodedInputStream::from_bytes(&data); + meta.merge_from(&mut s).map_err(|e| Error { + description: format!("Unable to parse proto: {:?}", e), + cause: Some(Box::new(e)), + })?; + Ok(meta) + } + + // TODO: this should be exposed only to the db layer. + pub fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> { + let (tmp_path, final_path) = unsafe { + (ffi::CStr::from_ptr("meta.tmp\0".as_ptr() as *const c_char), + ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char)) + }; + let mut f = unsafe { self.fd.openat(tmp_path.as_ptr(), + libc::O_CREAT | libc::O_TRUNC | libc::O_WRONLY, + 0o600)? }; + meta.write_to_writer(&mut f).map_err(|e| Error { + description: format!("Unable to write metadata proto: {:?}", e), + cause: Some(Box::new(e)), + })?; + f.sync_all()?; + unsafe { self.fd.renameat(tmp_path.as_ptr(), final_path.as_ptr())? }; + self.sync()?; + Ok(()) } /// Creates a new writer. @@ -166,7 +277,10 @@ impl SampleFileDir { }, }; - let f = match self.open_int(uuid, libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) { + let p = SampleFileDir::get_rel_pathname(uuid); + let f = match unsafe { self.fd.openat(p.as_ptr(), + libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, + 0o600) } { Ok(f) => f, Err(e) => { self.mutable.lock().unwrap().next_uuid = Some(uuid); @@ -178,17 +292,6 @@ impl SampleFileDir { pub fn statfs(&self) -> Result { self.fd.statfs() } - /// Opens a sample file within this directory with the given flags and (if creating) mode. - fn open_int(&self, uuid: Uuid, flags: libc::c_int, mode: libc::c_int) - -> Result { - let p = SampleFileDir::get_rel_pathname(uuid); - let fd = unsafe { libc::openat(self.fd.0, p.as_ptr(), flags, mode) }; - if fd < 0 { - return Err(io::Error::last_os_error()) - } - unsafe { Ok(fs::File::from_raw_fd(fd)) } - } - /// Gets a pathname for a sample file suitable for passing to open or unlink. fn get_rel_pathname(uuid: Uuid) -> [libc::c_char; 37] { let mut buf = [0u8; 37]; diff --git a/src/main.rs b/src/main.rs index 16d2fb8..a9c085e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -50,6 +50,7 @@ extern crate moonfire_ffmpeg; extern crate mylog; extern crate openssl; extern crate parking_lot; +extern crate protobuf; extern crate regex; extern crate serde; #[macro_use] extern crate serde_derive; @@ -71,6 +72,7 @@ mod h264; mod json; mod mp4; mod recording; +mod schema; mod slices; mod stream; mod streamer; diff --git a/src/schema.proto b/src/schema.proto new file mode 100644 index 0000000..a2e4f1d --- /dev/null +++ b/src/schema.proto @@ -0,0 +1,73 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2018 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +syntax = "proto3"; + +// Metadata stored in sample file dirs as "/meta". This is checked +// against the metadata stored within the database to detect inconsistencies +// between the directory and database, including the following: +// +// * sample file directory's disk not being mounted. +// * mixing up mount points of two sample file directories belonging to the +// same database. +// * directory renames not properly recorded in the database. +// * restoration of the database from backup but not the sample file +// directory. +// * restoration of the sample file directory but not the database. +// * two sample file directory paths pointed at the same inode via symlinks +// or non-canonical paths. (Note that flock(2) has a design flaw in which +// multiple file descriptors can share a lock, so the current locking scheme +// is not sufficient to detect this otherwise.) +// * database and sample file directories forked from the same version, opened +// the same number of times, then crossed. +message DirMeta { + // A uuid associated with the database, in binary form. dir_uuid is strictly + // more powerful, but it improves diagnostics to know if the directory + // belongs to the expected database at all or not. + bytes db_uuid = 1; + + // A uuid associated with the directory itself. + bytes dir_uuid = 2; + + // Corresponds to an entry in the `open` database table. + message Open { + uint32 id = 1; + bytes uuid = 2; + } + + // The last open that was known to be recorded in the database as completed. + // Absent if this has never happened. + Open last_complete_open = 3; + + // The last run which is in progress, if different from last_complete_open. + // This may or may not have been recorded in the database, but it's + // guaranteed that no data has yet been written by this open. + Open in_progress_open = 4; +} diff --git a/src/schema.rs b/src/schema.rs new file mode 100644 index 0000000..fd685c3 --- /dev/null +++ b/src/schema.rs @@ -0,0 +1,712 @@ +// This file is generated. Do not edit +// @generated + +// https://github.com/Manishearth/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy)] + +#![cfg_attr(rustfmt, rustfmt_skip)] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unsafe_code)] +#![allow(unused_imports)] +#![allow(unused_results)] + +use protobuf::Message as Message_imported_for_functions; +use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions; + +#[derive(PartialEq,Clone,Default)] +pub struct DirMeta { + // message fields + pub db_uuid: ::std::vec::Vec, + pub dir_uuid: ::std::vec::Vec, + pub last_complete_open: ::protobuf::SingularPtrField, + pub in_progress_open: ::protobuf::SingularPtrField, + // special fields + unknown_fields: ::protobuf::UnknownFields, + cached_size: ::protobuf::CachedSize, +} + +// see codegen.rs for the explanation why impl Sync explicitly +unsafe impl ::std::marker::Sync for DirMeta {} + +impl DirMeta { + pub fn new() -> DirMeta { + ::std::default::Default::default() + } + + pub fn default_instance() -> &'static DirMeta { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const DirMeta, + }; + unsafe { + instance.get(DirMeta::new) + } + } + + // bytes db_uuid = 1; + + pub fn clear_db_uuid(&mut self) { + self.db_uuid.clear(); + } + + // Param is passed by value, moved + pub fn set_db_uuid(&mut self, v: ::std::vec::Vec) { + self.db_uuid = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_db_uuid(&mut self) -> &mut ::std::vec::Vec { + &mut self.db_uuid + } + + // Take field + pub fn take_db_uuid(&mut self) -> ::std::vec::Vec { + ::std::mem::replace(&mut self.db_uuid, ::std::vec::Vec::new()) + } + + pub fn get_db_uuid(&self) -> &[u8] { + &self.db_uuid + } + + fn get_db_uuid_for_reflect(&self) -> &::std::vec::Vec { + &self.db_uuid + } + + fn mut_db_uuid_for_reflect(&mut self) -> &mut ::std::vec::Vec { + &mut self.db_uuid + } + + // bytes dir_uuid = 2; + + pub fn clear_dir_uuid(&mut self) { + self.dir_uuid.clear(); + } + + // Param is passed by value, moved + pub fn set_dir_uuid(&mut self, v: ::std::vec::Vec) { + self.dir_uuid = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_dir_uuid(&mut self) -> &mut ::std::vec::Vec { + &mut self.dir_uuid + } + + // Take field + pub fn take_dir_uuid(&mut self) -> ::std::vec::Vec { + ::std::mem::replace(&mut self.dir_uuid, ::std::vec::Vec::new()) + } + + pub fn get_dir_uuid(&self) -> &[u8] { + &self.dir_uuid + } + + fn get_dir_uuid_for_reflect(&self) -> &::std::vec::Vec { + &self.dir_uuid + } + + fn mut_dir_uuid_for_reflect(&mut self) -> &mut ::std::vec::Vec { + &mut self.dir_uuid + } + + // .DirMeta.Open last_complete_open = 3; + + pub fn clear_last_complete_open(&mut self) { + self.last_complete_open.clear(); + } + + pub fn has_last_complete_open(&self) -> bool { + self.last_complete_open.is_some() + } + + // Param is passed by value, moved + pub fn set_last_complete_open(&mut self, v: DirMeta_Open) { + self.last_complete_open = ::protobuf::SingularPtrField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_last_complete_open(&mut self) -> &mut DirMeta_Open { + if self.last_complete_open.is_none() { + self.last_complete_open.set_default(); + } + self.last_complete_open.as_mut().unwrap() + } + + // Take field + pub fn take_last_complete_open(&mut self) -> DirMeta_Open { + self.last_complete_open.take().unwrap_or_else(|| DirMeta_Open::new()) + } + + pub fn get_last_complete_open(&self) -> &DirMeta_Open { + self.last_complete_open.as_ref().unwrap_or_else(|| DirMeta_Open::default_instance()) + } + + fn get_last_complete_open_for_reflect(&self) -> &::protobuf::SingularPtrField { + &self.last_complete_open + } + + fn mut_last_complete_open_for_reflect(&mut self) -> &mut ::protobuf::SingularPtrField { + &mut self.last_complete_open + } + + // .DirMeta.Open in_progress_open = 4; + + pub fn clear_in_progress_open(&mut self) { + self.in_progress_open.clear(); + } + + pub fn has_in_progress_open(&self) -> bool { + self.in_progress_open.is_some() + } + + // Param is passed by value, moved + pub fn set_in_progress_open(&mut self, v: DirMeta_Open) { + self.in_progress_open = ::protobuf::SingularPtrField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_in_progress_open(&mut self) -> &mut DirMeta_Open { + if self.in_progress_open.is_none() { + self.in_progress_open.set_default(); + } + self.in_progress_open.as_mut().unwrap() + } + + // Take field + pub fn take_in_progress_open(&mut self) -> DirMeta_Open { + self.in_progress_open.take().unwrap_or_else(|| DirMeta_Open::new()) + } + + pub fn get_in_progress_open(&self) -> &DirMeta_Open { + self.in_progress_open.as_ref().unwrap_or_else(|| DirMeta_Open::default_instance()) + } + + fn get_in_progress_open_for_reflect(&self) -> &::protobuf::SingularPtrField { + &self.in_progress_open + } + + fn mut_in_progress_open_for_reflect(&mut self) -> &mut ::protobuf::SingularPtrField { + &mut self.in_progress_open + } +} + +impl ::protobuf::Message for DirMeta { + fn is_initialized(&self) -> bool { + for v in &self.last_complete_open { + if !v.is_initialized() { + return false; + } + }; + for v in &self.in_progress_open { + if !v.is_initialized() { + return false; + } + }; + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.db_uuid)?; + }, + 2 => { + ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.dir_uuid)?; + }, + 3 => { + ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.last_complete_open)?; + }, + 4 => { + ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.in_progress_open)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.db_uuid.is_empty() { + my_size += ::protobuf::rt::bytes_size(1, &self.db_uuid); + } + if !self.dir_uuid.is_empty() { + my_size += ::protobuf::rt::bytes_size(2, &self.dir_uuid); + } + if let Some(ref v) = self.last_complete_open.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + } + if let Some(ref v) = self.in_progress_open.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if !self.db_uuid.is_empty() { + os.write_bytes(1, &self.db_uuid)?; + } + if !self.dir_uuid.is_empty() { + os.write_bytes(2, &self.dir_uuid)?; + } + if let Some(ref v) = self.last_complete_open.as_ref() { + os.write_tag(3, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + } + if let Some(ref v) = self.in_progress_open.as_ref() { + os.write_tag(4, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &::std::any::Any { + self as &::std::any::Any + } + fn as_any_mut(&mut self) -> &mut ::std::any::Any { + self as &mut ::std::any::Any + } + fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + ::protobuf::MessageStatic::descriptor_static(None::) + } +} + +impl ::protobuf::MessageStatic for DirMeta { + fn new() -> DirMeta { + DirMeta::new() + } + + fn descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "db_uuid", + DirMeta::get_db_uuid_for_reflect, + DirMeta::mut_db_uuid_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "dir_uuid", + DirMeta::get_dir_uuid_for_reflect, + DirMeta::mut_dir_uuid_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "last_complete_open", + DirMeta::get_last_complete_open_for_reflect, + DirMeta::mut_last_complete_open_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "in_progress_open", + DirMeta::get_in_progress_open_for_reflect, + DirMeta::mut_in_progress_open_for_reflect, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "DirMeta", + fields, + file_descriptor_proto() + ) + }) + } + } +} + +impl ::protobuf::Clear for DirMeta { + fn clear(&mut self) { + self.clear_db_uuid(); + self.clear_dir_uuid(); + self.clear_last_complete_open(); + self.clear_in_progress_open(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for DirMeta { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for DirMeta { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +#[derive(PartialEq,Clone,Default)] +pub struct DirMeta_Open { + // message fields + pub id: u32, + pub uuid: ::std::vec::Vec, + // special fields + unknown_fields: ::protobuf::UnknownFields, + cached_size: ::protobuf::CachedSize, +} + +// see codegen.rs for the explanation why impl Sync explicitly +unsafe impl ::std::marker::Sync for DirMeta_Open {} + +impl DirMeta_Open { + pub fn new() -> DirMeta_Open { + ::std::default::Default::default() + } + + pub fn default_instance() -> &'static DirMeta_Open { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const DirMeta_Open, + }; + unsafe { + instance.get(DirMeta_Open::new) + } + } + + // uint32 id = 1; + + pub fn clear_id(&mut self) { + self.id = 0; + } + + // Param is passed by value, moved + pub fn set_id(&mut self, v: u32) { + self.id = v; + } + + pub fn get_id(&self) -> u32 { + self.id + } + + fn get_id_for_reflect(&self) -> &u32 { + &self.id + } + + fn mut_id_for_reflect(&mut self) -> &mut u32 { + &mut self.id + } + + // bytes uuid = 2; + + pub fn clear_uuid(&mut self) { + self.uuid.clear(); + } + + // Param is passed by value, moved + pub fn set_uuid(&mut self, v: ::std::vec::Vec) { + self.uuid = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_uuid(&mut self) -> &mut ::std::vec::Vec { + &mut self.uuid + } + + // Take field + pub fn take_uuid(&mut self) -> ::std::vec::Vec { + ::std::mem::replace(&mut self.uuid, ::std::vec::Vec::new()) + } + + pub fn get_uuid(&self) -> &[u8] { + &self.uuid + } + + fn get_uuid_for_reflect(&self) -> &::std::vec::Vec { + &self.uuid + } + + fn mut_uuid_for_reflect(&mut self) -> &mut ::std::vec::Vec { + &mut self.uuid + } +} + +impl ::protobuf::Message for DirMeta_Open { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_uint32()?; + self.id = tmp; + }, + 2 => { + ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.uuid)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if self.id != 0 { + my_size += ::protobuf::rt::value_size(1, self.id, ::protobuf::wire_format::WireTypeVarint); + } + if !self.uuid.is_empty() { + my_size += ::protobuf::rt::bytes_size(2, &self.uuid); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if self.id != 0 { + os.write_uint32(1, self.id)?; + } + if !self.uuid.is_empty() { + os.write_bytes(2, &self.uuid)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &::std::any::Any { + self as &::std::any::Any + } + fn as_any_mut(&mut self) -> &mut ::std::any::Any { + self as &mut ::std::any::Any + } + fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + ::protobuf::MessageStatic::descriptor_static(None::) + } +} + +impl ::protobuf::MessageStatic for DirMeta_Open { + fn new() -> DirMeta_Open { + DirMeta_Open::new() + } + + fn descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeUint32>( + "id", + DirMeta_Open::get_id_for_reflect, + DirMeta_Open::mut_id_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "uuid", + DirMeta_Open::get_uuid_for_reflect, + DirMeta_Open::mut_uuid_for_reflect, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "DirMeta_Open", + fields, + file_descriptor_proto() + ) + }) + } + } +} + +impl ::protobuf::Clear for DirMeta_Open { + fn clear(&mut self) { + self.clear_id(); + self.clear_uuid(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for DirMeta_Open { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for DirMeta_Open { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x0cschema.proto\"\xdf\x01\n\x07DirMeta\x12\x17\n\x07db_uuid\x18\x01\ + \x20\x01(\x0cR\x06dbUuid\x12\x19\n\x08dir_uuid\x18\x02\x20\x01(\x0cR\x07\ + dirUuid\x12;\n\x12last_complete_open\x18\x03\x20\x01(\x0b2\r.DirMeta.Ope\ + nR\x10lastCompleteOpen\x127\n\x10in_progress_open\x18\x04\x20\x01(\x0b2\ + \r.DirMeta.OpenR\x0einProgressOpen\x1a*\n\x04Open\x12\x0e\n\x02id\x18\ + \x01\x20\x01(\rR\x02id\x12\x12\n\x04uuid\x18\x02\x20\x01(\x0cR\x04uuidJ\ + \xdf\x1b\n\x06\x12\x04\x1e\0H\x01\n\xc2\x0b\n\x01\x0c\x12\x03\x1e\0\x122\ + \xb7\x0b\x20This\x20file\x20is\x20part\x20of\x20Moonfire\x20NVR,\x20a\ + \x20security\x20camera\x20digital\x20video\x20recorder.\n\x20Copyright\ + \x20(C)\x202018\x20Scott\x20Lamb\x20\n\n\x20This\x20pro\ + gram\x20is\x20free\x20software:\x20you\x20can\x20redistribute\x20it\x20a\ + nd/or\x20modify\n\x20it\x20under\x20the\x20terms\x20of\x20the\x20GNU\x20\ + General\x20Public\x20License\x20as\x20published\x20by\n\x20the\x20Free\ + \x20Software\x20Foundation,\x20either\x20version\x203\x20of\x20the\x20Li\ + cense,\x20or\n\x20(at\x20your\x20option)\x20any\x20later\x20version.\n\n\ + \x20In\x20addition,\x20as\x20a\x20special\x20exception,\x20the\x20copyri\ + ght\x20holders\x20give\n\x20permission\x20to\x20link\x20the\x20code\x20o\ + f\x20portions\x20of\x20this\x20program\x20with\x20the\n\x20OpenSSL\x20li\ + brary\x20under\x20certain\x20conditions\x20as\x20described\x20in\x20each\ + \n\x20individual\x20source\x20file,\x20and\x20distribute\x20linked\x20co\ + mbinations\x20including\n\x20the\x20two.\n\n\x20You\x20must\x20obey\x20t\ + he\x20GNU\x20General\x20Public\x20License\x20in\x20all\x20respects\x20fo\ + r\x20all\n\x20of\x20the\x20code\x20used\x20other\x20than\x20OpenSSL.\x20\ + If\x20you\x20modify\x20file(s)\x20with\x20this\n\x20exception,\x20you\ + \x20may\x20extend\x20this\x20exception\x20to\x20your\x20version\x20of\ + \x20the\n\x20file(s),\x20but\x20you\x20are\x20not\x20obligated\x20to\x20\ + do\x20so.\x20If\x20you\x20do\x20not\x20wish\x20to\x20do\n\x20so,\x20dele\ + te\x20this\x20exception\x20statement\x20from\x20your\x20version.\x20If\ + \x20you\x20delete\n\x20this\x20exception\x20statement\x20from\x20all\x20\ + source\x20files\x20in\x20the\x20program,\x20then\n\x20also\x20delete\x20\ + it\x20here.\n\n\x20This\x20program\x20is\x20distributed\x20in\x20the\x20\ + hope\x20that\x20it\x20will\x20be\x20useful,\n\x20but\x20WITHOUT\x20ANY\ + \x20WARRANTY;\x20without\x20even\x20the\x20implied\x20warranty\x20of\n\ + \x20MERCHANTABILITY\x20or\x20FITNESS\x20FOR\x20A\x20PARTICULAR\x20PURPOS\ + E.\x20\x20See\x20the\n\x20GNU\x20General\x20Public\x20License\x20for\x20\ + more\x20details.\n\n\x20You\x20should\x20have\x20received\x20a\x20copy\ + \x20of\x20the\x20GNU\x20General\x20Public\x20License\n\x20along\x20with\ + \x20this\x20program.\x20\x20If\x20not,\x20see\x20.\n\n\xc4\x07\n\x02\x04\0\x12\x041\0H\x01\x1a\xb7\x07\x20Metadata\ + \x20stored\x20in\x20sample\x20file\x20dirs\x20as\x20\"/meta\".\x20T\ + his\x20is\x20checked\n\x20against\x20the\x20metadata\x20stored\x20within\ + \x20the\x20database\x20to\x20detect\x20inconsistencies\n\x20between\x20t\ + he\x20directory\x20and\x20database,\x20including\x20the\x20following:\n\ + \n\x20*\x20sample\x20file\x20directory's\x20disk\x20not\x20being\x20moun\ + ted.\n\x20*\x20mixing\x20up\x20mount\x20points\x20of\x20two\x20sample\ + \x20file\x20directories\x20belonging\x20to\x20the\n\x20\x20\x20same\x20d\ + atabase.\n\x20*\x20directory\x20renames\x20not\x20properly\x20recorded\ + \x20in\x20the\x20database.\n\x20*\x20restoration\x20of\x20the\x20databas\ + e\x20from\x20backup\x20but\x20not\x20the\x20sample\x20file\n\x20\x20\x20\ + directory.\n\x20*\x20restoration\x20of\x20the\x20sample\x20file\x20direc\ + tory\x20but\x20not\x20the\x20database.\n\x20*\x20two\x20sample\x20file\ + \x20directory\x20paths\x20pointed\x20at\x20the\x20same\x20inode\x20via\ + \x20symlinks\n\x20\x20\x20or\x20non-canonical\x20paths.\x20(Note\x20that\ + \x20flock(2)\x20has\x20a\x20design\x20flaw\x20in\x20which\n\x20\x20\x20m\ + ultiple\x20file\x20descriptors\x20can\x20share\x20a\x20lock,\x20so\x20th\ + e\x20current\x20locking\x20scheme\n\x20\x20\x20is\x20not\x20sufficient\ + \x20to\x20detect\x20this\x20otherwise.)\n\x20*\x20database\x20and\x20sam\ + ple\x20file\x20directories\x20forked\x20from\x20the\x20same\x20version,\ + \x20opened\n\x20\x20\x20the\x20same\x20number\x20of\x20times,\x20then\ + \x20crossed.\n\n\n\n\x03\x04\0\x01\x12\x031\x08\x0f\n\xcf\x01\n\x04\x04\ + \0\x02\0\x12\x035\x02\x14\x1a\xc1\x01\x20A\x20uuid\x20associated\x20with\ + \x20the\x20database,\x20in\x20binary\x20form.\x20dir_uuid\x20is\x20stric\ + tly\n\x20more\x20powerful,\x20but\x20it\x20improves\x20diagnostics\x20to\ + \x20know\x20if\x20the\x20directory\n\x20belongs\x20to\x20the\x20expected\ + \x20database\x20at\x20all\x20or\x20not.\n\n\r\n\x05\x04\0\x02\0\x04\x12\ + \x045\x021\x11\n\x0c\n\x05\x04\0\x02\0\x05\x12\x035\x02\x07\n\x0c\n\x05\ + \x04\0\x02\0\x01\x12\x035\x08\x0f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x035\ + \x12\x13\n;\n\x04\x04\0\x02\x01\x12\x038\x02\x15\x1a.\x20A\x20uuid\x20as\ + sociated\x20with\x20the\x20directory\x20itself.\n\n\r\n\x05\x04\0\x02\ + \x01\x04\x12\x048\x025\x14\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x038\x02\ + \x07\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x038\x08\x10\n\x0c\n\x05\x04\0\ + \x02\x01\x03\x12\x038\x13\x14\nE\n\x04\x04\0\x03\0\x12\x04;\x02>\x03\x1a\ + 7\x20Corresponds\x20to\x20an\x20entry\x20in\x20the\x20`open`\x20database\ + \x20table.\n\n\x0c\n\x05\x04\0\x03\0\x01\x12\x03;\n\x0e\n\r\n\x06\x04\0\ + \x03\0\x02\0\x12\x03<\x04\x12\n\x0f\n\x07\x04\0\x03\0\x02\0\x04\x12\x04<\ + \x04;\x10\n\x0e\n\x07\x04\0\x03\0\x02\0\x05\x12\x03<\x04\n\n\x0e\n\x07\ + \x04\0\x03\0\x02\0\x01\x12\x03<\x0b\r\n\x0e\n\x07\x04\0\x03\0\x02\0\x03\ + \x12\x03<\x10\x11\n\r\n\x06\x04\0\x03\0\x02\x01\x12\x03=\x04\x13\n\x0f\n\ + \x07\x04\0\x03\0\x02\x01\x04\x12\x04=\x04<\x12\n\x0e\n\x07\x04\0\x03\0\ + \x02\x01\x05\x12\x03=\x04\t\n\x0e\n\x07\x04\0\x03\0\x02\x01\x01\x12\x03=\ + \n\x0e\n\x0e\n\x07\x04\0\x03\0\x02\x01\x03\x12\x03=\x11\x12\n|\n\x04\x04\ + \0\x02\x02\x12\x03B\x02\x1e\x1ao\x20The\x20last\x20open\x20that\x20was\ + \x20known\x20to\x20be\x20recorded\x20in\x20the\x20database\x20as\x20comp\ + leted.\n\x20Absent\x20if\x20this\x20has\x20never\x20happened.\n\n\r\n\ + \x05\x04\0\x02\x02\x04\x12\x04B\x02>\x03\n\x0c\n\x05\x04\0\x02\x02\x06\ + \x12\x03B\x02\x06\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03B\x07\x19\n\x0c\n\ + \x05\x04\0\x02\x02\x03\x12\x03B\x1c\x1d\n\xd6\x01\n\x04\x04\0\x02\x03\ + \x12\x03G\x02\x1c\x1a\xc8\x01\x20The\x20last\x20run\x20which\x20is\x20in\ + \x20progress,\x20if\x20different\x20from\x20last_complete_open.\n\x20Thi\ + s\x20may\x20or\x20may\x20not\x20have\x20been\x20recorded\x20in\x20the\ + \x20database,\x20but\x20it's\n\x20guaranteed\x20that\x20no\x20data\x20ha\ + s\x20yet\x20been\x20written\x20by\x20this\x20open.\n\n\r\n\x05\x04\0\x02\ + \x03\x04\x12\x04G\x02B\x1e\n\x0c\n\x05\x04\0\x02\x03\x06\x12\x03G\x02\ + \x06\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03G\x07\x17\n\x0c\n\x05\x04\0\ + \x02\x03\x03\x12\x03G\x1a\x1bb\x06proto3\ +"; + +static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto, +}; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + unsafe { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) + } +} diff --git a/src/schema.sql b/src/schema.sql index 222b6db..0350eff 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -31,6 +31,11 @@ -- schema.sql: SQLite3 database schema for Moonfire NVR. -- See also design/schema.md. +-- Database metadata. There should be exactly one row in this table. +create table meta ( + uuid blob not null check (length(uuid) = 16) +); + -- This table tracks the schema version. -- There is one row for the initial database creation (inserted below, after the -- create statements) and one for each upgrade procedure (if any). @@ -45,10 +50,23 @@ create table version ( notes text ); +-- Tracks every time the database has been opened in read/write mode. +-- This is used to ensure directories are in sync with the database (see +-- schema.proto:DirMeta). It may be used in the API for etags and such in the +-- future. +create table open ( + id integer primary key, + uuid blob unique not null check (length(uuid) = 16) +); + create table sample_file_dir ( id integer primary key, path text unique not null, - uuid blob unique not null check (length(uuid) = 16) + uuid blob unique not null check (length(uuid) = 16), + + -- The last (read/write) open of this directory which fully completed. + -- See schema.proto:DirMeta for a more complete description. + last_complete_open_id integer references open (id) ); create table camera ( diff --git a/src/testutil.rs b/src/testutil.rs index 8f9d207..1673165 100644 --- a/src/testutil.rs +++ b/src/testutil.rs @@ -78,10 +78,9 @@ impl TestDb { pub fn new() -> TestDb { let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); - let conn = rusqlite::Connection::open_in_memory().unwrap(); - let schema = include_str!("schema.sql"); - conn.execute_batch(schema).unwrap(); - let db = Arc::new(db::Database::new(conn).unwrap()); + let mut conn = rusqlite::Connection::open_in_memory().unwrap(); + db::Database::init(&mut conn).unwrap(); + let db = Arc::new(db::Database::new(conn, true).unwrap()); let (test_camera_uuid, sample_file_dir_id); let path = tmpdir.path().to_str().unwrap().to_owned(); let dir; @@ -109,7 +108,7 @@ impl TestDb { tx.update_retention(TEST_STREAM_ID, true, 1048576).unwrap(); tx.commit().unwrap(); } - dir = l.sample_file_dirs_by_id().get(&sample_file_dir_id).unwrap().open().unwrap(); + dir = l.sample_file_dirs_by_id().get(&sample_file_dir_id).unwrap().get().unwrap(); } let mut dirs_by_stream_id = FnvHashMap::default(); dirs_by_stream_id.insert(TEST_STREAM_ID, dir.clone()); diff --git a/src/web.rs b/src/web.rs index e080f6e..f6ff503 100644 --- a/src/web.rs +++ b/src/web.rs @@ -419,7 +419,7 @@ impl Service { d.insert(id, l.sample_file_dirs_by_id() .get(&dir_id) .unwrap() - .open()?); + .get()?); } Arc::new(d) };