refine 1->3 upgrade process

In hindsight, the "post_tx" step in the upgrade process introduced in
e7f5733 doesn't make sense. If the procedure fails at this stage, nothing says
it still needs to be completed. If the sample file dirs have to be updated
after the database, then there should be another database version to mark that
it's fully completed, and indeed that's the purpose version 3 serves. So get
rid of the Upgrader trait and just go back to a simple run function per
version.

In the case of the sample file dir metadata, it actually can happen before the
database transaction; the stuff written to the database later just needs to be
consistent with what it finds if there's an existing metadata file from a
half-completed update.

For safety, ensure there are no unexpected directory contents before
upgrading 1->2, and ensure the metadata matches before upgrading 2->3.
This commit is contained in:
Scott Lamb 2018-03-01 09:26:03 -08:00
parent bcf42fe02c
commit f01f523c2c
6 changed files with 449 additions and 417 deletions

104
db/dir.rs
View File

@ -62,7 +62,7 @@ use std::thread;
pub struct SampleFileDir {
/// The open file descriptor for the directory. The worker uses it to create files and sync the
/// directory. Other threads use it to open sample files for reading during video serving.
fd: Fd,
pub(crate) fd: Fd,
}
/// A file descriptor associated with a directory (not necessarily the sample file dir).
@ -80,24 +80,30 @@ impl Drop for Fd {
impl Fd {
/// Opens the given path as a directory.
pub fn open(fd: Option<&Fd>, path: &str, mkdir: bool) -> Result<Fd, io::Error> {
let fd = fd.map(|fd| fd.0).unwrap_or(libc::AT_FDCWD);
pub fn open(path: &str, mkdir: bool) -> Result<Fd, io::Error> {
let cstring = ffi::CString::new(path)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
if mkdir && unsafe { libc::mkdirat(fd, cstring.as_ptr(), 0o700) } != 0 {
if mkdir && unsafe { libc::mkdir(cstring.as_ptr(), 0o700) } != 0 {
let e = io::Error::last_os_error();
if e.kind() != io::ErrorKind::AlreadyExists {
return Err(e.into());
}
}
let fd = unsafe { libc::openat(fd, cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY,
0) };
let fd = unsafe { libc::open(cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, 0) };
if fd < 0 {
return Err(io::Error::last_os_error().into());
}
Ok(Fd(fd))
}
pub(crate) fn sync(&self) -> Result<(), io::Error> {
let res = unsafe { libc::fsync(self.0) };
if res < 0 {
return Err(io::Error::last_os_error())
}
Ok(())
}
/// Opens a sample file within this directory with the given flags and (if creating) mode.
unsafe fn openat(&self, p: *const c_char, flags: libc::c_int, mode: libc::c_int)
-> Result<fs::File, io::Error> {
@ -128,7 +134,7 @@ impl Fd {
}
}
pub unsafe fn renameat(from_fd: &Fd, from_path: *const c_char,
pub(crate) unsafe fn renameat(from_fd: &Fd, from_path: *const c_char,
to_fd: &Fd, to_path: *const c_char) -> Result<(), io::Error> {
let result = libc::renameat(from_fd.0, from_path, to_fd.0, to_path);
if result < 0 {
@ -137,6 +143,41 @@ pub unsafe fn renameat(from_fd: &Fd, from_path: *const c_char,
Ok(())
}
/// Reads `dir`'s metadata. If none is found, returns an empty proto.
pub(crate) fn read_meta(dir: &Fd) -> Result<schema::DirMeta, Error> {
let mut meta = schema::DirMeta::default();
let p = unsafe { ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char) };
let mut f = match unsafe { dir.openat(p.as_ptr(), libc::O_RDONLY, 0) } {
Err(e) => {
if e.kind() == ::std::io::ErrorKind::NotFound {
return Ok(meta);
}
return Err(e.into());
},
Ok(f) => f,
};
let mut data = Vec::new();
f.read_to_end(&mut data)?;
let mut s = protobuf::CodedInputStream::from_bytes(&data);
meta.merge_from(&mut s).map_err(|e| e.context("Unable to parse metadata proto: {}"))?;
Ok(meta)
}
/// Write `dir`'s metadata, clobbering existing data.
pub(crate) fn write_meta(dir: &Fd, meta: &schema::DirMeta) -> Result<(), Error> {
let (tmp_path, final_path) = unsafe {
(ffi::CStr::from_ptr("meta.tmp\0".as_ptr() as *const c_char),
ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char))
};
let mut f = unsafe { dir.openat(tmp_path.as_ptr(),
libc::O_CREAT | libc::O_TRUNC | libc::O_WRONLY, 0o600)? };
meta.write_to_writer(&mut f)?;
f.sync_all()?;
unsafe { renameat(&dir, tmp_path.as_ptr(), &dir, final_path.as_ptr())? };
dir.sync()?;
Ok(())
}
impl SampleFileDir {
/// Opens the directory using the given metadata.
///
@ -147,7 +188,7 @@ impl SampleFileDir {
let read_write = db_meta.in_progress_open.is_some();
let s = SampleFileDir::open_self(path, false)?;
s.fd.lock(if read_write { libc::LOCK_EX } else { libc::LOCK_SH } | libc::LOCK_NB)?;
let dir_meta = s.read_meta()?;
let dir_meta = read_meta(&s.fd)?;
if !SampleFileDir::consistent(db_meta, &dir_meta) {
bail!("metadata mismatch.\ndb: {:#?}\ndir: {:#?}", db_meta, &dir_meta);
}
@ -176,14 +217,14 @@ impl SampleFileDir {
true
}
pub fn create(path: &str, db_meta: &schema::DirMeta) -> Result<Arc<SampleFileDir>, Error> {
pub(crate) fn create(path: &str, db_meta: &schema::DirMeta)
-> Result<Arc<SampleFileDir>, Error> {
let s = SampleFileDir::open_self(path, true)?;
s.fd.lock(libc::LOCK_EX | libc::LOCK_NB)?;
let old_meta = s.read_meta()?;
let old_meta = read_meta(&s.fd)?;
// Verify metadata. We only care that it hasn't been completely opened.
// Partial opening by this or another database is fine; we won't overwrite anything.
// TODO: consider one exception: if the version 2 upgrade fails at the post_tx step.
if old_meta.last_complete_open.is_some() {
bail!("Can't create dir at path {}: is already in use:\n{:?}", path, old_meta);
}
@ -193,7 +234,7 @@ impl SampleFileDir {
}
fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> {
let fd = Fd::open(None, path, create)
let fd = Fd::open(path, create)
.map_err(|e| format_err!("unable to open sample file dir {}: {}", path, e))?;
Ok(Arc::new(SampleFileDir {
fd,
@ -206,39 +247,8 @@ impl SampleFileDir {
unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) }
}
/// Reads the directory metadata. If none is found, returns an empty proto.
fn read_meta(&self) -> Result<schema::DirMeta, Error> {
let mut meta = schema::DirMeta::default();
let p = unsafe { ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char) };
let mut f = match unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } {
Err(e) => {
if e.kind() == ::std::io::ErrorKind::NotFound {
return Ok(meta);
}
return Err(e.into());
},
Ok(f) => f,
};
let mut data = Vec::new();
f.read_to_end(&mut data)?;
let mut s = protobuf::CodedInputStream::from_bytes(&data);
meta.merge_from(&mut s).map_err(|e| e.context("Unable to parse metadata proto: {}"))?;
Ok(meta)
}
pub(crate) fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> {
let (tmp_path, final_path) = unsafe {
(ffi::CStr::from_ptr("meta.tmp\0".as_ptr() as *const c_char),
ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char))
};
let mut f = unsafe { self.fd.openat(tmp_path.as_ptr(),
libc::O_CREAT | libc::O_TRUNC | libc::O_WRONLY,
0o600)? };
meta.write_to_writer(&mut f)?;
f.sync_all()?;
unsafe { renameat(&self.fd, tmp_path.as_ptr(), &self.fd, final_path.as_ptr())? };
self.sync()?;
Ok(())
write_meta(&self.fd, meta)
}
pub fn statfs(&self) -> Result<libc::statvfs, io::Error> { self.fd.statfs() }
@ -264,11 +274,7 @@ impl SampleFileDir {
/// Syncs the directory itself.
fn sync(&self) -> Result<(), io::Error> {
let res = unsafe { libc::fsync(self.fd.0) };
if res < 0 {
return Err(io::Error::last_os_error())
}
Ok(())
self.fd.sync()
}
}

View File

@ -43,11 +43,6 @@ mod v2_to_v3;
const UPGRADE_NOTES: &'static str =
concat!("upgraded using moonfire-db ", env!("CARGO_PKG_VERSION"));
pub trait Upgrader {
fn in_tx(&mut self, &rusqlite::Transaction) -> Result<(), Error> { Ok(()) }
fn post_tx(&mut self) -> Result<(), Error> { Ok(()) }
}
#[derive(Debug)]
pub struct Args<'a> {
pub flag_sample_file_dir: Option<&'a str>,
@ -65,9 +60,9 @@ fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(),
pub fn run(args: &Args, conn: &mut rusqlite::Connection) -> Result<(), Error> {
let upgraders = [
v0_to_v1::new,
v1_to_v2::new,
v2_to_v3::new,
v0_to_v1::run,
v1_to_v2::run,
v2_to_v3::run,
];
{
@ -84,15 +79,13 @@ pub fn run(args: &Args, conn: &mut rusqlite::Connection) -> Result<(), Error> {
set_journal_mode(&conn, args.flag_preset_journal).unwrap();
for ver in old_ver .. db::EXPECTED_VERSION {
info!("...from version {} to version {}", ver, ver + 1);
let mut u = upgraders[ver as usize](&args)?;
let tx = conn.transaction()?;
u.in_tx(&tx)?;
upgraders[ver as usize](&args, &tx)?;
tx.execute(r#"
insert into version (id, unix_time, notes)
values (?, cast(strftime('%s', 'now') as int32), ?)
"#, &[&(ver + 1), &UPGRADE_NOTES])?;
tx.commit()?;
u.post_tx()?;
}
}

View File

@ -36,14 +36,7 @@ use recording;
use rusqlite;
use std::collections::HashMap;
pub struct U;
pub fn new<'a>(_args: &'a super::Args) -> Result<Box<super::Upgrader + 'a>, Error> {
Ok(Box::new(U))
}
impl super::Upgrader for U {
fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> {
pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> {
// These create statements match the schema.sql when version 1 was the latest.
tx.execute_batch(r#"
alter table camera rename to old_camera;
@ -103,7 +96,6 @@ impl super::Upgrader for U {
"#)?;
Ok(())
}
}
struct CameraState {
/// tuple of (run_start_id, next_start_90k).

View File

@ -32,65 +32,22 @@
use dir;
use failure::Error;
use libc;
use rusqlite;
use schema::DirMeta;
use std::fs;
use std::os::unix::ffi::OsStrExt;
use uuid::Uuid;
pub struct U<'a> {
sample_file_path: &'a str,
dir_meta: Option<DirMeta>,
}
pub fn new<'a>(args: &'a super::Args) -> Result<Box<super::Upgrader + 'a>, Error> {
pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> {
let sample_file_path =
args.flag_sample_file_dir
.ok_or_else(|| format_err!("--sample-file-dir required when upgrading from \
schema version 1 to 2."))?;
Ok(Box::new(U { sample_file_path, dir_meta: None }))
}
impl<'a> U<'a> {
/// Ensures there are sample files in the directory for all listed recordings.
/// Among other problems, this catches a fat-fingered `--sample-file-dir`.
fn verify_sample_files(&self, tx: &rusqlite::Transaction) -> Result<(), Error> {
// Build a hash of the uuids found in sample_file_path. Ignore other files.
let n: i64 = tx.query_row("select count(*) from recording", &[], |r| r.get_checked(0))??;
let mut files = ::fnv::FnvHashSet::with_capacity_and_hasher(n as usize, Default::default());
for e in fs::read_dir(self.sample_file_path)? {
let e = e?;
let f = e.file_name();
let s = match f.to_str() {
Some(s) => s,
None => continue,
};
let uuid = match Uuid::parse_str(s) {
Ok(u) => u,
Err(_) => continue,
};
if s != uuid.hyphenated().to_string() { // non-canonical form.
continue;
}
files.insert(uuid);
}
// Iterate through the database and check that everything has a matching file.
let mut stmt = tx.prepare(r"select sample_file_uuid from recording_playback")?;
let mut rows = stmt.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let uuid: ::db::FromSqlUuid = row.get_checked(0)?;
if !files.contains(&uuid.0) {
bail!("{} is missing from dir {}!", uuid.0, self.sample_file_path);
}
}
Ok(())
}
}
impl<'a> super::Upgrader for U<'a> {
fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> {
self.verify_sample_files(tx)?;
let d = dir::Fd::open(sample_file_path, false)?;
d.lock(libc::LOCK_EX | libc::LOCK_NB)?;
verify_dir_contents(sample_file_path, tx)?;
// These create statements match the schema.sql when version 2 was the latest.
tx.execute_batch(r#"
@ -118,20 +75,21 @@ impl<'a> super::Upgrader for U<'a> {
let dir_uuid = ::uuid::Uuid::new_v4();
let dir_uuid_bytes = &dir_uuid.as_bytes()[..];
// Write matching metadata to the directory.
let mut meta = DirMeta::default();
{
meta.db_uuid.extend_from_slice(db_uuid_bytes);
meta.dir_uuid.extend_from_slice(dir_uuid_bytes);
let open = meta.mut_in_progress_open();
let open = meta.mut_last_complete_open();
open.id = open_id;
open.uuid.extend_from_slice(&open_uuid_bytes);
}
dir::write_meta(&d, &meta)?;
tx.execute(r#"
insert into sample_file_dir (path, uuid, last_complete_open_id)
values (?, ?, ?)
"#, &[&self.sample_file_path, &dir_uuid_bytes, &open_id])?;
self.dir_meta = Some(meta);
"#, &[&sample_file_path, &dir_uuid_bytes, &open_id])?;
tx.execute_batch(r#"
drop table reserved_sample_files;
@ -255,7 +213,7 @@ impl<'a> super::Upgrader for U<'a> {
select
r.composite_id,
r.camera_id,
o.open_id,
o.id,
r.run_offset,
r.flags,
r.sample_file_bytes,
@ -280,13 +238,76 @@ impl<'a> super::Upgrader for U<'a> {
Ok(())
}
fn post_tx(&mut self) -> Result<(), Error> {
let mut meta = self.dir_meta.take().unwrap();
let d = dir::SampleFileDir::create(self.sample_file_path, &meta)?;
::std::mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open);
d.write_meta(&meta)?;
Ok(())
/// Ensures the sample file directory has the expected contents.
/// Among other problems, this catches a fat-fingered `--sample-file-dir`.
/// The expected contents are:
///
/// * required: recording uuids.
/// * optional: reserved sample file uuids.
/// * optional: meta and meta-tmp from half-completed update attempts.
/// * forbidden: anything else.
fn verify_dir_contents(sample_file_path: &str, tx: &rusqlite::Transaction) -> Result<(), Error> {
// Build a hash of the uuids found in the directory.
let n: i64 = tx.query_row(r#"
select
a.c + b.c
from
(select count(*) as c from recording) a,
(select count(*) as c from reserved_sample_files) b;
"#, &[], |r| r.get_checked(0))??;
let mut files = ::fnv::FnvHashSet::with_capacity_and_hasher(n as usize, Default::default());
for e in fs::read_dir(sample_file_path)? {
let e = e?;
let f = e.file_name();
match f.as_bytes() {
b"." | b".." => continue,
b"meta" | b"meta-tmp" => {
// Ignore metadata files. These might from a half-finished update attempt.
// If the directory is actually an in-use >v3 format, other contents won't match.
continue;
},
_ => {},
};
let s = match f.to_str() {
Some(s) => s,
None => bail!("unexpected file {:?} in {:?}", f, sample_file_path),
};
let uuid = match Uuid::parse_str(s) {
Ok(u) => u,
Err(_) => bail!("unexpected file {:?} in {:?}", f, sample_file_path),
};
if s != uuid.hyphenated().to_string() { // non-canonical form.
bail!("unexpected file {:?} in {:?}", f, sample_file_path);
}
files.insert(uuid);
}
// Iterate through the database and check that everything has a matching file.
{
let mut stmt = tx.prepare(r"select sample_file_uuid from recording_playback")?;
let mut rows = stmt.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let uuid: ::db::FromSqlUuid = row.get_checked(0)?;
if !files.remove(&uuid.0) {
bail!("{} is missing from dir {}!", uuid.0, sample_file_path);
}
}
}
let mut stmt = tx.prepare(r"select uuid from reserved_sample_files")?;
let mut rows = stmt.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let uuid: ::db::FromSqlUuid = row.get_checked(0)?;
files.remove(&uuid.0);
}
if !files.is_empty() {
bail!("{} unexpected sample file uuids in dir {}: {:?}!",
files.len(), sample_file_path, files);
}
Ok(())
}
fn fix_video_sample_entry(tx: &rusqlite::Transaction) -> Result<(), Error> {

View File

@ -29,47 +29,54 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
/// Upgrades a version 2 schema to a version 3 schema.
/// Note that a version 2 schema is never actually used; so we know the upgrade from version 1 was
/// completed, and possibly an upgrade from 2 to 3 is half-finished.
use db::{self, FromSqlUuid};
use dir;
use failure::Error;
use libc;
use schema;
use std::io::{self, Write};
use std::mem;
use std::sync::Arc;
use rusqlite;
use uuid::Uuid;
pub struct U;
pub fn new<'a>(_args: &'a super::Args) -> Result<Box<super::Upgrader + 'a>, Error> {
Ok(Box::new(U))
/// Opens the sample file dir.
///
/// Makes a couple simplifying assumptions valid for version 2:
/// * there's only one dir.
/// * it has a last completed open.
fn open_sample_file_dir(tx: &rusqlite::Transaction) -> Result<Arc<dir::SampleFileDir>, Error> {
let (p, s_uuid, o_id, o_uuid, db_uuid): (String, FromSqlUuid, i32, FromSqlUuid, FromSqlUuid) =
tx.query_row(r#"
select
s.path, s.uuid, s.last_complete_open_id, o.uuid, m.uuid
from
sample_file_dir s
join open o on (s.last_complete_open_id = o.id)
cross join meta m
"#, &[], |row| {
(row.get_checked(0).unwrap(),
row.get_checked(1).unwrap(),
row.get_checked(2).unwrap(),
row.get_checked(3).unwrap(),
row.get_checked(4).unwrap())
})?;
let mut meta = schema::DirMeta::default();
meta.db_uuid.extend_from_slice(&db_uuid.0.as_bytes()[..]);
meta.dir_uuid.extend_from_slice(&s_uuid.0.as_bytes()[..]);
{
let open = meta.mut_last_complete_open();
open.id = o_id as u32;
open.uuid.extend_from_slice(&o_uuid.0.as_bytes()[..]);
}
dir::SampleFileDir::open(&p, &meta)
}
/// Gets a pathname for a sample file suitable for passing to open or unlink.
fn get_uuid_pathname(uuid: Uuid) -> [libc::c_char; 37] {
let mut buf = [0u8; 37];
write!(&mut buf[..36], "{}", uuid.hyphenated()).expect("can't format uuid to pathname buf");
// libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64).
unsafe { mem::transmute::<[u8; 37], [libc::c_char; 37]>(buf) }
}
fn get_id_pathname(id: db::CompositeId) -> [libc::c_char; 17] {
let mut buf = [0u8; 17];
write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf");
unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) }
}
impl super::Upgrader for U {
fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> {
let path: String = tx.query_row(r#"
select path from sample_file_dir
"#, &[], |row| { row.get_checked(0) })??;
// Build map of stream -> dirname.
let d = dir::Fd::open(None, &path, false)?;
//let stream_to_dir = build_stream_to_dir(&d, tx)?;
pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> {
let d = open_sample_file_dir(&tx)?;
let mut stmt = tx.prepare(r#"
select
composite_id,
@ -84,8 +91,7 @@ impl super::Upgrader for U {
let sample_file_uuid: FromSqlUuid = row.get_checked(1)?;
let from_path = get_uuid_pathname(sample_file_uuid.0);
let to_path = get_id_pathname(id);
//let to_dir: &dir::Fd = stream_to_dir[stream_id as usize].as_ref().unwrap();
let r = unsafe { dir::renameat(&d, from_path.as_ptr(), &d, to_path.as_ptr()) };
let r = unsafe { dir::renameat(&d.fd, from_path.as_ptr(), &d.fd, to_path.as_ptr()) };
if let Err(e) = r {
if e.kind() == io::ErrorKind::NotFound {
continue; // assume it was already moved.
@ -113,4 +119,18 @@ impl super::Upgrader for U {
"#)?;
Ok(())
}
/// Gets a pathname for a sample file suitable for passing to open or unlink.
fn get_uuid_pathname(uuid: Uuid) -> [libc::c_char; 37] {
let mut buf = [0u8; 37];
write!(&mut buf[..36], "{}", uuid.hyphenated()).expect("can't format uuid to pathname buf");
// libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64).
unsafe { mem::transmute::<[u8; 37], [libc::c_char; 37]>(buf) }
}
fn get_id_pathname(id: db::CompositeId) -> [libc::c_char; 17] {
let mut buf = [0u8; 17];
write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf");
unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) }
}

View File

@ -75,7 +75,7 @@ enum OpenMode {
/// Locks and opens the database.
/// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is.
fn open_conn(db_dir: &str, mode: OpenMode) -> Result<(dir::Fd, rusqlite::Connection), Error> {
let dir = dir::Fd::open(None, db_dir, mode == OpenMode::Create)?;
let dir = dir::Fd::open(db_dir, mode == OpenMode::Create)?;
let ro = mode == OpenMode::ReadOnly;
dir.lock(if ro { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB)
.map_err(|e| e.context(format!("db dir {:?} already in use; can't get {} lock",