diff --git a/db/dir.rs b/db/dir.rs index 695d0d8..c8d6049 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -62,7 +62,7 @@ use std::thread; pub struct SampleFileDir { /// The open file descriptor for the directory. The worker uses it to create files and sync the /// directory. Other threads use it to open sample files for reading during video serving. - fd: Fd, + pub(crate) fd: Fd, } /// A file descriptor associated with a directory (not necessarily the sample file dir). @@ -80,24 +80,30 @@ impl Drop for Fd { impl Fd { /// Opens the given path as a directory. - pub fn open(fd: Option<&Fd>, path: &str, mkdir: bool) -> Result { - let fd = fd.map(|fd| fd.0).unwrap_or(libc::AT_FDCWD); + pub fn open(path: &str, mkdir: bool) -> Result { let cstring = ffi::CString::new(path) .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - if mkdir && unsafe { libc::mkdirat(fd, cstring.as_ptr(), 0o700) } != 0 { + if mkdir && unsafe { libc::mkdir(cstring.as_ptr(), 0o700) } != 0 { let e = io::Error::last_os_error(); if e.kind() != io::ErrorKind::AlreadyExists { return Err(e.into()); } } - let fd = unsafe { libc::openat(fd, cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, - 0) }; + let fd = unsafe { libc::open(cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, 0) }; if fd < 0 { return Err(io::Error::last_os_error().into()); } Ok(Fd(fd)) } + pub(crate) fn sync(&self) -> Result<(), io::Error> { + let res = unsafe { libc::fsync(self.0) }; + if res < 0 { + return Err(io::Error::last_os_error()) + } + Ok(()) + } + /// Opens a sample file within this directory with the given flags and (if creating) mode. unsafe fn openat(&self, p: *const c_char, flags: libc::c_int, mode: libc::c_int) -> Result { @@ -128,7 +134,7 @@ impl Fd { } } -pub unsafe fn renameat(from_fd: &Fd, from_path: *const c_char, +pub(crate) unsafe fn renameat(from_fd: &Fd, from_path: *const c_char, to_fd: &Fd, to_path: *const c_char) -> Result<(), io::Error> { let result = libc::renameat(from_fd.0, from_path, to_fd.0, to_path); if result < 0 { @@ -137,6 +143,41 @@ pub unsafe fn renameat(from_fd: &Fd, from_path: *const c_char, Ok(()) } +/// Reads `dir`'s metadata. If none is found, returns an empty proto. +pub(crate) fn read_meta(dir: &Fd) -> Result { + let mut meta = schema::DirMeta::default(); + let p = unsafe { ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char) }; + let mut f = match unsafe { dir.openat(p.as_ptr(), libc::O_RDONLY, 0) } { + Err(e) => { + if e.kind() == ::std::io::ErrorKind::NotFound { + return Ok(meta); + } + return Err(e.into()); + }, + Ok(f) => f, + }; + let mut data = Vec::new(); + f.read_to_end(&mut data)?; + let mut s = protobuf::CodedInputStream::from_bytes(&data); + meta.merge_from(&mut s).map_err(|e| e.context("Unable to parse metadata proto: {}"))?; + Ok(meta) +} + +/// Write `dir`'s metadata, clobbering existing data. +pub(crate) fn write_meta(dir: &Fd, meta: &schema::DirMeta) -> Result<(), Error> { + let (tmp_path, final_path) = unsafe { + (ffi::CStr::from_ptr("meta.tmp\0".as_ptr() as *const c_char), + ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char)) + }; + let mut f = unsafe { dir.openat(tmp_path.as_ptr(), + libc::O_CREAT | libc::O_TRUNC | libc::O_WRONLY, 0o600)? }; + meta.write_to_writer(&mut f)?; + f.sync_all()?; + unsafe { renameat(&dir, tmp_path.as_ptr(), &dir, final_path.as_ptr())? }; + dir.sync()?; + Ok(()) +} + impl SampleFileDir { /// Opens the directory using the given metadata. /// @@ -147,7 +188,7 @@ impl SampleFileDir { let read_write = db_meta.in_progress_open.is_some(); let s = SampleFileDir::open_self(path, false)?; s.fd.lock(if read_write { libc::LOCK_EX } else { libc::LOCK_SH } | libc::LOCK_NB)?; - let dir_meta = s.read_meta()?; + let dir_meta = read_meta(&s.fd)?; if !SampleFileDir::consistent(db_meta, &dir_meta) { bail!("metadata mismatch.\ndb: {:#?}\ndir: {:#?}", db_meta, &dir_meta); } @@ -176,14 +217,14 @@ impl SampleFileDir { true } - pub fn create(path: &str, db_meta: &schema::DirMeta) -> Result, Error> { + pub(crate) fn create(path: &str, db_meta: &schema::DirMeta) + -> Result, Error> { let s = SampleFileDir::open_self(path, true)?; s.fd.lock(libc::LOCK_EX | libc::LOCK_NB)?; - let old_meta = s.read_meta()?; + let old_meta = read_meta(&s.fd)?; // Verify metadata. We only care that it hasn't been completely opened. // Partial opening by this or another database is fine; we won't overwrite anything. - // TODO: consider one exception: if the version 2 upgrade fails at the post_tx step. if old_meta.last_complete_open.is_some() { bail!("Can't create dir at path {}: is already in use:\n{:?}", path, old_meta); } @@ -193,7 +234,7 @@ impl SampleFileDir { } fn open_self(path: &str, create: bool) -> Result, Error> { - let fd = Fd::open(None, path, create) + let fd = Fd::open(path, create) .map_err(|e| format_err!("unable to open sample file dir {}: {}", path, e))?; Ok(Arc::new(SampleFileDir { fd, @@ -206,39 +247,8 @@ impl SampleFileDir { unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } } - /// Reads the directory metadata. If none is found, returns an empty proto. - fn read_meta(&self) -> Result { - let mut meta = schema::DirMeta::default(); - let p = unsafe { ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char) }; - let mut f = match unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } { - Err(e) => { - if e.kind() == ::std::io::ErrorKind::NotFound { - return Ok(meta); - } - return Err(e.into()); - }, - Ok(f) => f, - }; - let mut data = Vec::new(); - f.read_to_end(&mut data)?; - let mut s = protobuf::CodedInputStream::from_bytes(&data); - meta.merge_from(&mut s).map_err(|e| e.context("Unable to parse metadata proto: {}"))?; - Ok(meta) - } - pub(crate) fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> { - let (tmp_path, final_path) = unsafe { - (ffi::CStr::from_ptr("meta.tmp\0".as_ptr() as *const c_char), - ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char)) - }; - let mut f = unsafe { self.fd.openat(tmp_path.as_ptr(), - libc::O_CREAT | libc::O_TRUNC | libc::O_WRONLY, - 0o600)? }; - meta.write_to_writer(&mut f)?; - f.sync_all()?; - unsafe { renameat(&self.fd, tmp_path.as_ptr(), &self.fd, final_path.as_ptr())? }; - self.sync()?; - Ok(()) + write_meta(&self.fd, meta) } pub fn statfs(&self) -> Result { self.fd.statfs() } @@ -264,11 +274,7 @@ impl SampleFileDir { /// Syncs the directory itself. fn sync(&self) -> Result<(), io::Error> { - let res = unsafe { libc::fsync(self.fd.0) }; - if res < 0 { - return Err(io::Error::last_os_error()) - } - Ok(()) + self.fd.sync() } } diff --git a/db/upgrade/mod.rs b/db/upgrade/mod.rs index 2687948..082d339 100644 --- a/db/upgrade/mod.rs +++ b/db/upgrade/mod.rs @@ -43,11 +43,6 @@ mod v2_to_v3; const UPGRADE_NOTES: &'static str = concat!("upgraded using moonfire-db ", env!("CARGO_PKG_VERSION")); -pub trait Upgrader { - fn in_tx(&mut self, &rusqlite::Transaction) -> Result<(), Error> { Ok(()) } - fn post_tx(&mut self) -> Result<(), Error> { Ok(()) } -} - #[derive(Debug)] pub struct Args<'a> { pub flag_sample_file_dir: Option<&'a str>, @@ -65,9 +60,9 @@ fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(), pub fn run(args: &Args, conn: &mut rusqlite::Connection) -> Result<(), Error> { let upgraders = [ - v0_to_v1::new, - v1_to_v2::new, - v2_to_v3::new, + v0_to_v1::run, + v1_to_v2::run, + v2_to_v3::run, ]; { @@ -84,15 +79,13 @@ pub fn run(args: &Args, conn: &mut rusqlite::Connection) -> Result<(), Error> { set_journal_mode(&conn, args.flag_preset_journal).unwrap(); for ver in old_ver .. db::EXPECTED_VERSION { info!("...from version {} to version {}", ver, ver + 1); - let mut u = upgraders[ver as usize](&args)?; let tx = conn.transaction()?; - u.in_tx(&tx)?; + upgraders[ver as usize](&args, &tx)?; tx.execute(r#" insert into version (id, unix_time, notes) values (?, cast(strftime('%s', 'now') as int32), ?) "#, &[&(ver + 1), &UPGRADE_NOTES])?; tx.commit()?; - u.post_tx()?; } } diff --git a/db/upgrade/v0_to_v1.rs b/db/upgrade/v0_to_v1.rs index a9f0ba4..bafbfbb 100644 --- a/db/upgrade/v0_to_v1.rs +++ b/db/upgrade/v0_to_v1.rs @@ -36,73 +36,65 @@ use recording; use rusqlite; use std::collections::HashMap; -pub struct U; - -pub fn new<'a>(_args: &'a super::Args) -> Result, Error> { - Ok(Box::new(U)) -} - -impl super::Upgrader for U { - fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> { - // These create statements match the schema.sql when version 1 was the latest. - tx.execute_batch(r#" - alter table camera rename to old_camera; - create table camera ( - id integer primary key, - uuid blob unique, - short_name text not null, - description text, - host text, - username text, - password text, - main_rtsp_path text, - sub_rtsp_path text, - retain_bytes integer not null check (retain_bytes >= 0), - next_recording_id integer not null check (next_recording_id >= 0) - ); - alter table recording rename to old_recording; - drop index recording_cover; - create table recording ( - composite_id integer primary key, - camera_id integer not null references camera (id), - run_offset integer not null, - flags integer not null, - sample_file_bytes integer not null check (sample_file_bytes > 0), - start_time_90k integer not null check (start_time_90k > 0), - duration_90k integer not null - check (duration_90k >= 0 and duration_90k < 5*60*90000), - local_time_delta_90k integer not null, - video_samples integer not null check (video_samples > 0), - video_sync_samples integer not null check (video_samples > 0), - video_sample_entry_id integer references video_sample_entry (id), - check (composite_id >> 32 = camera_id) - ); - create index recording_cover on recording ( - camera_id, - start_time_90k, - duration_90k, - video_samples, - video_sync_samples, - video_sample_entry_id, - sample_file_bytes, - run_offset, - flags - ); - create table recording_playback ( - composite_id integer primary key references recording (composite_id), - sample_file_uuid blob not null check (length(sample_file_uuid) = 16), - sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), - video_index blob not null check (length(video_index) > 0) - ); - "#)?; - let camera_state = fill_recording(tx).unwrap(); - fill_camera(tx, camera_state).unwrap(); - tx.execute_batch(r#" - drop table old_camera; - drop table old_recording; - "#)?; - Ok(()) - } +pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> { + // These create statements match the schema.sql when version 1 was the latest. + tx.execute_batch(r#" + alter table camera rename to old_camera; + create table camera ( + id integer primary key, + uuid blob unique, + short_name text not null, + description text, + host text, + username text, + password text, + main_rtsp_path text, + sub_rtsp_path text, + retain_bytes integer not null check (retain_bytes >= 0), + next_recording_id integer not null check (next_recording_id >= 0) + ); + alter table recording rename to old_recording; + drop index recording_cover; + create table recording ( + composite_id integer primary key, + camera_id integer not null references camera (id), + run_offset integer not null, + flags integer not null, + sample_file_bytes integer not null check (sample_file_bytes > 0), + start_time_90k integer not null check (start_time_90k > 0), + duration_90k integer not null + check (duration_90k >= 0 and duration_90k < 5*60*90000), + local_time_delta_90k integer not null, + video_samples integer not null check (video_samples > 0), + video_sync_samples integer not null check (video_samples > 0), + video_sample_entry_id integer references video_sample_entry (id), + check (composite_id >> 32 = camera_id) + ); + create index recording_cover on recording ( + camera_id, + start_time_90k, + duration_90k, + video_samples, + video_sync_samples, + video_sample_entry_id, + sample_file_bytes, + run_offset, + flags + ); + create table recording_playback ( + composite_id integer primary key references recording (composite_id), + sample_file_uuid blob not null check (length(sample_file_uuid) = 16), + sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), + video_index blob not null check (length(video_index) > 0) + ); + "#)?; + let camera_state = fill_recording(tx).unwrap(); + fill_camera(tx, camera_state).unwrap(); + tx.execute_batch(r#" + drop table old_camera; + drop table old_recording; + "#)?; + Ok(()) } struct CameraState { diff --git a/db/upgrade/v1_to_v2.rs b/db/upgrade/v1_to_v2.rs index b3686b8..ce36c80 100644 --- a/db/upgrade/v1_to_v2.rs +++ b/db/upgrade/v1_to_v2.rs @@ -32,261 +32,282 @@ use dir; use failure::Error; +use libc; use rusqlite; use schema::DirMeta; use std::fs; +use std::os::unix::ffi::OsStrExt; use uuid::Uuid; -pub struct U<'a> { - sample_file_path: &'a str, - dir_meta: Option, -} - -pub fn new<'a>(args: &'a super::Args) -> Result, Error> { +pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> { let sample_file_path = args.flag_sample_file_dir .ok_or_else(|| format_err!("--sample-file-dir required when upgrading from \ schema version 1 to 2."))?; - Ok(Box::new(U { sample_file_path, dir_meta: None })) + + let d = dir::Fd::open(sample_file_path, false)?; + d.lock(libc::LOCK_EX | libc::LOCK_NB)?; + verify_dir_contents(sample_file_path, tx)?; + + // These create statements match the schema.sql when version 2 was the latest. + tx.execute_batch(r#" + create table meta ( + uuid blob not null check (length(uuid) = 16) + ); + create table open ( + id integer primary key, + uuid blob unique not null check (length(uuid) = 16) + ); + create table sample_file_dir ( + id integer primary key, + path text unique not null, + uuid blob unique not null check (length(uuid) = 16), + last_complete_open_id integer references open (id) + ); + "#)?; + let db_uuid = ::uuid::Uuid::new_v4(); + let db_uuid_bytes = &db_uuid.as_bytes()[..]; + tx.execute("insert into meta (uuid) values (?)", &[&db_uuid_bytes])?; + let open_uuid = ::uuid::Uuid::new_v4(); + let open_uuid_bytes = &open_uuid.as_bytes()[..]; + tx.execute("insert into open (uuid) values (?)", &[&open_uuid_bytes])?; + let open_id = tx.last_insert_rowid() as u32; + let dir_uuid = ::uuid::Uuid::new_v4(); + let dir_uuid_bytes = &dir_uuid.as_bytes()[..]; + + // Write matching metadata to the directory. + let mut meta = DirMeta::default(); + { + meta.db_uuid.extend_from_slice(db_uuid_bytes); + meta.dir_uuid.extend_from_slice(dir_uuid_bytes); + let open = meta.mut_last_complete_open(); + open.id = open_id; + open.uuid.extend_from_slice(&open_uuid_bytes); + } + dir::write_meta(&d, &meta)?; + + tx.execute(r#" + insert into sample_file_dir (path, uuid, last_complete_open_id) + values (?, ?, ?) + "#, &[&sample_file_path, &dir_uuid_bytes, &open_id])?; + + tx.execute_batch(r#" + drop table reserved_sample_files; + alter table camera rename to old_camera; + alter table recording rename to old_recording; + alter table video_sample_entry rename to old_video_sample_entry; + drop index recording_cover; + + create table camera ( + id integer primary key, + uuid blob unique not null check (length(uuid) = 16), + short_name text not null, + description text, + host text, + username text, + password text + ); + + create table stream ( + id integer primary key, + camera_id integer not null references camera (id), + sample_file_dir_id integer references sample_file_dir (id), + type text not null check (type in ('main', 'sub')), + record integer not null check (record in (1, 0)), + rtsp_path text not null, + retain_bytes integer not null check (retain_bytes >= 0), + flush_if_sec integer not null, + next_recording_id integer not null check (next_recording_id >= 0), + unique (camera_id, type) + ); + + create table recording ( + composite_id integer primary key, + stream_id integer not null references stream (id), + open_id integer not null, + run_offset integer not null, + flags integer not null, + sample_file_bytes integer not null check (sample_file_bytes > 0), + start_time_90k integer not null check (start_time_90k > 0), + duration_90k integer not null + check (duration_90k >= 0 and duration_90k < 5*60*90000), + local_time_delta_90k integer not null, + video_samples integer not null check (video_samples > 0), + video_sync_samples integer not null check (video_sync_samples > 0), + video_sample_entry_id integer references video_sample_entry (id), + check (composite_id >> 32 = stream_id) + ); + + create index recording_cover on recording ( + stream_id, + start_time_90k, + open_id, + duration_90k, + video_samples, + video_sync_samples, + video_sample_entry_id, + sample_file_bytes, + run_offset, + flags + ); + + create table video_sample_entry ( + id integer primary key, + sha1 blob unique not null check (length(sha1) = 20), + width integer not null check (width > 0), + height integer not null check (height > 0), + rfc6381_codec text not null, + data blob not null check (length(data) > 86) + ); + + create table garbage ( + sample_file_dir_id integer references sample_file_dir (id), + composite_id integer, + primary key (sample_file_dir_id, composite_id) + ) without rowid; + + insert into camera + select + id, + uuid, + short_name, + description, + host, + username, + password + from old_camera; + + -- Insert main streams using the same id as the camera, to ease changing recordings. + insert into stream + select + old_camera.id, + old_camera.id, + sample_file_dir.id, + 'main', + 1, + old_camera.main_rtsp_path, + old_camera.retain_bytes, + 0, + old_camera.next_recording_id + from + old_camera cross join sample_file_dir; + + -- Insert sub stream (if path is non-empty) using any id. + insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path, + retain_bytes, flush_if_sec, next_recording_id) + select + old_camera.id, + sample_file_dir.id, + 'sub', + 0, + old_camera.sub_rtsp_path, + 0, + 90, + 1 + from + old_camera cross join sample_file_dir + where + old_camera.sub_rtsp_path != ''; + + insert into recording + select + r.composite_id, + r.camera_id, + o.id, + r.run_offset, + r.flags, + r.sample_file_bytes, + r.start_time_90k, + r.duration_90k, + r.local_time_delta_90k, + r.video_samples, + r.video_sync_samples, + r.video_sample_entry_id + from + old_recording r cross join open o; + "#)?; + + fix_video_sample_entry(tx)?; + + tx.execute_batch(r#" + drop table old_camera; + drop table old_recording; + drop table old_video_sample_entry; + "#)?; + + Ok(()) } -impl<'a> U<'a> { - /// Ensures there are sample files in the directory for all listed recordings. - /// Among other problems, this catches a fat-fingered `--sample-file-dir`. - fn verify_sample_files(&self, tx: &rusqlite::Transaction) -> Result<(), Error> { - // Build a hash of the uuids found in sample_file_path. Ignore other files. - let n: i64 = tx.query_row("select count(*) from recording", &[], |r| r.get_checked(0))??; - let mut files = ::fnv::FnvHashSet::with_capacity_and_hasher(n as usize, Default::default()); - for e in fs::read_dir(self.sample_file_path)? { - let e = e?; - let f = e.file_name(); - let s = match f.to_str() { - Some(s) => s, - None => continue, - }; - let uuid = match Uuid::parse_str(s) { - Ok(u) => u, - Err(_) => continue, - }; - if s != uuid.hyphenated().to_string() { // non-canonical form. +/// Ensures the sample file directory has the expected contents. +/// Among other problems, this catches a fat-fingered `--sample-file-dir`. +/// The expected contents are: +/// +/// * required: recording uuids. +/// * optional: reserved sample file uuids. +/// * optional: meta and meta-tmp from half-completed update attempts. +/// * forbidden: anything else. +fn verify_dir_contents(sample_file_path: &str, tx: &rusqlite::Transaction) -> Result<(), Error> { + // Build a hash of the uuids found in the directory. + let n: i64 = tx.query_row(r#" + select + a.c + b.c + from + (select count(*) as c from recording) a, + (select count(*) as c from reserved_sample_files) b; + "#, &[], |r| r.get_checked(0))??; + let mut files = ::fnv::FnvHashSet::with_capacity_and_hasher(n as usize, Default::default()); + for e in fs::read_dir(sample_file_path)? { + let e = e?; + let f = e.file_name(); + match f.as_bytes() { + b"." | b".." => continue, + b"meta" | b"meta-tmp" => { + // Ignore metadata files. These might from a half-finished update attempt. + // If the directory is actually an in-use >v3 format, other contents won't match. continue; - } - files.insert(uuid); + }, + _ => {}, + }; + let s = match f.to_str() { + Some(s) => s, + None => bail!("unexpected file {:?} in {:?}", f, sample_file_path), + }; + let uuid = match Uuid::parse_str(s) { + Ok(u) => u, + Err(_) => bail!("unexpected file {:?} in {:?}", f, sample_file_path), + }; + if s != uuid.hyphenated().to_string() { // non-canonical form. + bail!("unexpected file {:?} in {:?}", f, sample_file_path); } + files.insert(uuid); + } - // Iterate through the database and check that everything has a matching file. + // Iterate through the database and check that everything has a matching file. + { let mut stmt = tx.prepare(r"select sample_file_uuid from recording_playback")?; let mut rows = stmt.query(&[])?; while let Some(row) = rows.next() { let row = row?; let uuid: ::db::FromSqlUuid = row.get_checked(0)?; - if !files.contains(&uuid.0) { - bail!("{} is missing from dir {}!", uuid.0, self.sample_file_path); + if !files.remove(&uuid.0) { + bail!("{} is missing from dir {}!", uuid.0, sample_file_path); } } - Ok(()) - } -} - -impl<'a> super::Upgrader for U<'a> { - fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> { - self.verify_sample_files(tx)?; - - // These create statements match the schema.sql when version 2 was the latest. - tx.execute_batch(r#" - create table meta ( - uuid blob not null check (length(uuid) = 16) - ); - create table open ( - id integer primary key, - uuid blob unique not null check (length(uuid) = 16) - ); - create table sample_file_dir ( - id integer primary key, - path text unique not null, - uuid blob unique not null check (length(uuid) = 16), - last_complete_open_id integer references open (id) - ); - "#)?; - let db_uuid = ::uuid::Uuid::new_v4(); - let db_uuid_bytes = &db_uuid.as_bytes()[..]; - tx.execute("insert into meta (uuid) values (?)", &[&db_uuid_bytes])?; - let open_uuid = ::uuid::Uuid::new_v4(); - let open_uuid_bytes = &open_uuid.as_bytes()[..]; - tx.execute("insert into open (uuid) values (?)", &[&open_uuid_bytes])?; - let open_id = tx.last_insert_rowid() as u32; - let dir_uuid = ::uuid::Uuid::new_v4(); - let dir_uuid_bytes = &dir_uuid.as_bytes()[..]; - - let mut meta = DirMeta::default(); - { - meta.db_uuid.extend_from_slice(db_uuid_bytes); - meta.dir_uuid.extend_from_slice(dir_uuid_bytes); - let open = meta.mut_in_progress_open(); - open.id = open_id; - open.uuid.extend_from_slice(&open_uuid_bytes); - } - - tx.execute(r#" - insert into sample_file_dir (path, uuid, last_complete_open_id) - values (?, ?, ?) - "#, &[&self.sample_file_path, &dir_uuid_bytes, &open_id])?; - self.dir_meta = Some(meta); - - tx.execute_batch(r#" - drop table reserved_sample_files; - alter table camera rename to old_camera; - alter table recording rename to old_recording; - alter table video_sample_entry rename to old_video_sample_entry; - drop index recording_cover; - - create table camera ( - id integer primary key, - uuid blob unique not null check (length(uuid) = 16), - short_name text not null, - description text, - host text, - username text, - password text - ); - - create table stream ( - id integer primary key, - camera_id integer not null references camera (id), - sample_file_dir_id integer references sample_file_dir (id), - type text not null check (type in ('main', 'sub')), - record integer not null check (record in (1, 0)), - rtsp_path text not null, - retain_bytes integer not null check (retain_bytes >= 0), - flush_if_sec integer not null, - next_recording_id integer not null check (next_recording_id >= 0), - unique (camera_id, type) - ); - - create table recording ( - composite_id integer primary key, - stream_id integer not null references stream (id), - open_id integer not null, - run_offset integer not null, - flags integer not null, - sample_file_bytes integer not null check (sample_file_bytes > 0), - start_time_90k integer not null check (start_time_90k > 0), - duration_90k integer not null - check (duration_90k >= 0 and duration_90k < 5*60*90000), - local_time_delta_90k integer not null, - video_samples integer not null check (video_samples > 0), - video_sync_samples integer not null check (video_sync_samples > 0), - video_sample_entry_id integer references video_sample_entry (id), - check (composite_id >> 32 = stream_id) - ); - - create index recording_cover on recording ( - stream_id, - start_time_90k, - open_id, - duration_90k, - video_samples, - video_sync_samples, - video_sample_entry_id, - sample_file_bytes, - run_offset, - flags - ); - - create table video_sample_entry ( - id integer primary key, - sha1 blob unique not null check (length(sha1) = 20), - width integer not null check (width > 0), - height integer not null check (height > 0), - rfc6381_codec text not null, - data blob not null check (length(data) > 86) - ); - - create table garbage ( - sample_file_dir_id integer references sample_file_dir (id), - composite_id integer, - primary key (sample_file_dir_id, composite_id) - ) without rowid; - - insert into camera - select - id, - uuid, - short_name, - description, - host, - username, - password - from old_camera; - - -- Insert main streams using the same id as the camera, to ease changing recordings. - insert into stream - select - old_camera.id, - old_camera.id, - sample_file_dir.id, - 'main', - 1, - old_camera.main_rtsp_path, - old_camera.retain_bytes, - 0, - old_camera.next_recording_id - from - old_camera cross join sample_file_dir; - - -- Insert sub stream (if path is non-empty) using any id. - insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path, - retain_bytes, flush_if_sec, next_recording_id) - select - old_camera.id, - sample_file_dir.id, - 'sub', - 0, - old_camera.sub_rtsp_path, - 0, - 90, - 1 - from - old_camera cross join sample_file_dir - where - old_camera.sub_rtsp_path != ''; - - insert into recording - select - r.composite_id, - r.camera_id, - o.open_id, - r.run_offset, - r.flags, - r.sample_file_bytes, - r.start_time_90k, - r.duration_90k, - r.local_time_delta_90k, - r.video_samples, - r.video_sync_samples, - r.video_sample_entry_id - from - old_recording r cross join open o; - "#)?; - - fix_video_sample_entry(tx)?; - - tx.execute_batch(r#" - drop table old_camera; - drop table old_recording; - drop table old_video_sample_entry; - "#)?; - - Ok(()) } - fn post_tx(&mut self) -> Result<(), Error> { - let mut meta = self.dir_meta.take().unwrap(); - let d = dir::SampleFileDir::create(self.sample_file_path, &meta)?; - ::std::mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open); - d.write_meta(&meta)?; - Ok(()) + let mut stmt = tx.prepare(r"select uuid from reserved_sample_files")?; + let mut rows = stmt.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let uuid: ::db::FromSqlUuid = row.get_checked(0)?; + files.remove(&uuid.0); } + + if !files.is_empty() { + bail!("{} unexpected sample file uuids in dir {}: {:?}!", + files.len(), sample_file_path, files); + } + Ok(()) } fn fix_video_sample_entry(tx: &rusqlite::Transaction) -> Result<(), Error> { diff --git a/db/upgrade/v2_to_v3.rs b/db/upgrade/v2_to_v3.rs index b86eda5..e36eb35 100644 --- a/db/upgrade/v2_to_v3.rs +++ b/db/upgrade/v2_to_v3.rs @@ -29,20 +29,95 @@ // along with this program. If not, see . /// Upgrades a version 2 schema to a version 3 schema. +/// Note that a version 2 schema is never actually used; so we know the upgrade from version 1 was +/// completed, and possibly an upgrade from 2 to 3 is half-finished. use db::{self, FromSqlUuid}; use dir; use failure::Error; use libc; +use schema; use std::io::{self, Write}; use std::mem; +use std::sync::Arc; use rusqlite; use uuid::Uuid; -pub struct U; +/// Opens the sample file dir. +/// +/// Makes a couple simplifying assumptions valid for version 2: +/// * there's only one dir. +/// * it has a last completed open. +fn open_sample_file_dir(tx: &rusqlite::Transaction) -> Result, Error> { + let (p, s_uuid, o_id, o_uuid, db_uuid): (String, FromSqlUuid, i32, FromSqlUuid, FromSqlUuid) = + tx.query_row(r#" + select + s.path, s.uuid, s.last_complete_open_id, o.uuid, m.uuid + from + sample_file_dir s + join open o on (s.last_complete_open_id = o.id) + cross join meta m + "#, &[], |row| { + (row.get_checked(0).unwrap(), + row.get_checked(1).unwrap(), + row.get_checked(2).unwrap(), + row.get_checked(3).unwrap(), + row.get_checked(4).unwrap()) + })?; + let mut meta = schema::DirMeta::default(); + meta.db_uuid.extend_from_slice(&db_uuid.0.as_bytes()[..]); + meta.dir_uuid.extend_from_slice(&s_uuid.0.as_bytes()[..]); + { + let open = meta.mut_last_complete_open(); + open.id = o_id as u32; + open.uuid.extend_from_slice(&o_uuid.0.as_bytes()[..]); + } + dir::SampleFileDir::open(&p, &meta) +} -pub fn new<'a>(_args: &'a super::Args) -> Result, Error> { - Ok(Box::new(U)) +pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> { + let d = open_sample_file_dir(&tx)?; + let mut stmt = tx.prepare(r#" + select + composite_id, + sample_file_uuid + from + recording_playback + "#)?; + let mut rows = stmt.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let id = db::CompositeId(row.get_checked(0)?); + let sample_file_uuid: FromSqlUuid = row.get_checked(1)?; + let from_path = get_uuid_pathname(sample_file_uuid.0); + let to_path = get_id_pathname(id); + let r = unsafe { dir::renameat(&d.fd, from_path.as_ptr(), &d.fd, to_path.as_ptr()) }; + if let Err(e) = r { + if e.kind() == io::ErrorKind::NotFound { + continue; // assume it was already moved. + } + Err(e)?; + } + } + + // These create statements match the schema.sql when version 3 was the latest. + tx.execute_batch(r#" + alter table recording_playback rename to old_recording_playback; + create table recording_playback ( + composite_id integer primary key references recording (composite_id), + sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), + video_index blob not null check (length(video_index) > 0) + ); + insert into recording_playback + select + composite_id, + sample_file_sha1, + video_index + from + old_recording_playback; + drop table old_recording_playback; + "#)?; + Ok(()) } /// Gets a pathname for a sample file suitable for passing to open or unlink. @@ -59,58 +134,3 @@ fn get_id_pathname(id: db::CompositeId) -> [libc::c_char; 17] { write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf"); unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) } } - -impl super::Upgrader for U { - fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> { - let path: String = tx.query_row(r#" - select path from sample_file_dir - "#, &[], |row| { row.get_checked(0) })??; - - // Build map of stream -> dirname. - let d = dir::Fd::open(None, &path, false)?; - //let stream_to_dir = build_stream_to_dir(&d, tx)?; - - let mut stmt = tx.prepare(r#" - select - composite_id, - sample_file_uuid - from - recording_playback - "#)?; - let mut rows = stmt.query(&[])?; - while let Some(row) = rows.next() { - let row = row?; - let id = db::CompositeId(row.get_checked(0)?); - let sample_file_uuid: FromSqlUuid = row.get_checked(1)?; - let from_path = get_uuid_pathname(sample_file_uuid.0); - let to_path = get_id_pathname(id); - //let to_dir: &dir::Fd = stream_to_dir[stream_id as usize].as_ref().unwrap(); - let r = unsafe { dir::renameat(&d, from_path.as_ptr(), &d, to_path.as_ptr()) }; - if let Err(e) = r { - if e.kind() == io::ErrorKind::NotFound { - continue; // assume it was already moved. - } - Err(e)?; - } - } - - // These create statements match the schema.sql when version 3 was the latest. - tx.execute_batch(r#" - alter table recording_playback rename to old_recording_playback; - create table recording_playback ( - composite_id integer primary key references recording (composite_id), - sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), - video_index blob not null check (length(video_index) > 0) - ); - insert into recording_playback - select - composite_id, - sample_file_sha1, - video_index - from - old_recording_playback; - drop table old_recording_playback; - "#)?; - Ok(()) - } -} diff --git a/src/cmds/mod.rs b/src/cmds/mod.rs index 591fea3..a605a2c 100644 --- a/src/cmds/mod.rs +++ b/src/cmds/mod.rs @@ -75,7 +75,7 @@ enum OpenMode { /// Locks and opens the database. /// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is. fn open_conn(db_dir: &str, mode: OpenMode) -> Result<(dir::Fd, rusqlite::Connection), Error> { - let dir = dir::Fd::open(None, db_dir, mode == OpenMode::Create)?; + let dir = dir::Fd::open(db_dir, mode == OpenMode::Create)?; let ro = mode == OpenMode::ReadOnly; dir.lock(if ro { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB) .map_err(|e| e.context(format!("db dir {:?} already in use; can't get {} lock",