flexible config for sample_file_dir

This commit is contained in:
Scott Lamb 2021-10-26 11:47:13 -07:00
parent dad349840d
commit 721141770f
13 changed files with 114 additions and 43 deletions

View File

@ -7,6 +7,7 @@
use crate::compare;
use crate::db::{self, CompositeId, SqlUuid};
use crate::dir;
use crate::json::SampleFileDirConfig;
use crate::raw;
use crate::recording;
use crate::schema;
@ -89,7 +90,7 @@ pub fn run(conn: &mut rusqlite::Connection, opts: &Options) -> Result<i32, Error
while let Some(row) = rows.next()? {
let mut meta = schema::DirMeta::default();
let dir_id: i32 = row.get(0)?;
let dir_path: String = row.get(1)?;
let config: SampleFileDirConfig = row.get(1)?;
let dir_uuid: SqlUuid = row.get(2)?;
let open_id = row.get(3)?;
let open_uuid: SqlUuid = row.get(4)?;
@ -102,8 +103,8 @@ pub fn run(conn: &mut rusqlite::Connection, opts: &Options) -> Result<i32, Error
}
// Open the directory (checking its metadata) and hold it open (for the lock).
let dir = dir::SampleFileDir::open(&dir_path, &meta)
.map_err(|e| e.context(format!("unable to open dir {}", dir_path)))?;
let dir = dir::SampleFileDir::open(&config.path, &meta)
.map_err(|e| e.context(format!("unable to open dir {}", config.path.display())))?;
let mut streams = read_dir(&dir, opts)?;
let mut rows = garbage_stmt.query(params![dir_id])?;
while let Some(row) = rows.next()? {

View File

@ -29,6 +29,7 @@
use crate::auth;
use crate::days;
use crate::dir;
use crate::json::SampleFileDirConfig;
use crate::raw;
use crate::recording;
use crate::schema;
@ -52,6 +53,7 @@ use std::convert::TryInto;
use std::fmt::Write as _;
use std::mem;
use std::ops::Range;
use std::path::PathBuf;
use std::str;
use std::string::String;
use std::sync::Arc;
@ -317,7 +319,7 @@ pub(crate) struct ListOldestRecordingsRow {
#[derive(Debug)]
pub struct SampleFileDir {
pub id: i32,
pub path: String,
pub path: PathBuf,
pub uuid: Uuid,
dir: Option<Arc<dir::SampleFileDir>>,
last_complete_open: Option<Open>,
@ -1114,7 +1116,7 @@ impl LockedDatabase {
&mut log_msg,
"\n{}: added {}B in {} recordings ({}), deleted {}B in {} ({}), \
GCed {} recordings ({}).",
&dir.path,
dir.path.display(),
&encode_size(log.added_bytes),
log.added.len(),
log.added.iter().join(", "),
@ -1183,7 +1185,7 @@ impl LockedDatabase {
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
}
let d = dir::SampleFileDir::open(&dir.path, &expected_meta)
.map_err(|e| e.context(format!("Failed to open dir {}", dir.path)))?;
.map_err(|e| e.context(format!("Failed to open dir {}", dir.path.display())))?;
if self.open.is_none() {
// read-only mode; it's already fully opened.
dir.dir = Some(d);
@ -1544,7 +1546,7 @@ impl LockedDatabase {
r#"
select
d.id,
d.path,
d.config,
d.uuid,
d.last_complete_open_id,
o.uuid
@ -1555,6 +1557,7 @@ impl LockedDatabase {
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let id = row.get(0)?;
let config: SampleFileDirConfig = row.get(1)?;
let dir_uuid: SqlUuid = row.get(2)?;
let open_id: Option<u32> = row.get(3)?;
let open_uuid: Option<SqlUuid> = row.get(4)?;
@ -1568,7 +1571,7 @@ impl LockedDatabase {
SampleFileDir {
id,
uuid: dir_uuid.0,
path: row.get(1)?,
path: config.path,
dir: None,
last_complete_open,
garbage_needs_unlink: raw::list_garbage(&self.conn, id)?,
@ -1735,7 +1738,7 @@ impl LockedDatabase {
Ok(id)
}
pub fn add_sample_file_dir(&mut self, path: String) -> Result<i32, Error> {
pub fn add_sample_file_dir(&mut self, path: PathBuf) -> Result<i32, Error> {
let mut meta = schema::DirMeta::default();
let uuid = Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
@ -1754,12 +1757,16 @@ impl LockedDatabase {
}
let dir = dir::SampleFileDir::create(&path, &meta)?;
let config = SampleFileDirConfig {
path: path.clone(),
..Default::default()
};
self.conn.execute(
r#"
insert into sample_file_dir (path, uuid, last_complete_open_id)
values (?, ?, ?)
insert into sample_file_dir (config, uuid, last_complete_open_id)
values (?, ?, ?)
"#,
params![&path, uuid_bytes, o.id],
params![&config, uuid_bytes, o.id],
)?;
let id = self.conn.last_insert_rowid() as i32;
use ::std::collections::btree_map::Entry;
@ -1794,7 +1801,7 @@ impl LockedDatabase {
if !d.get().garbage_needs_unlink.is_empty() || !d.get().garbage_unlinked.is_empty() {
bail!(
"must collect garbage before deleting directory {}",
d.get().path
d.get().path.display()
);
}
let dir = match d.get_mut().dir.take() {
@ -1810,7 +1817,7 @@ impl LockedDatabase {
if !dir.is_empty()? {
bail!(
"Can't delete sample file directory {} which still has files",
&d.get().path
&d.get().path.display()
);
}
let mut meta = d.get().expected_meta(&self.uuid);
@ -2547,7 +2554,7 @@ mod tests {
.prefix("moonfire-nvr-test")
.tempdir()
.unwrap();
let path = tmpdir.path().to_str().unwrap().to_owned();
let path = tmpdir.path().to_owned();
let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap();
let mut c = CameraChange {
short_name: "testcam".to_owned(),

View File

@ -27,6 +27,7 @@ use std::fs;
use std::io::{Read, Write};
use std::ops::Range;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path;
use std::sync::Arc;
/// The fixed length of a directory's `meta` file.
@ -212,7 +213,7 @@ impl SampleFileDir {
///
/// `db_meta.in_progress_open` should be filled if the directory should be opened in read/write
/// mode; absent in read-only mode.
pub fn open(path: &str, expected_meta: &schema::DirMeta) -> Result<Arc<SampleFileDir>, Error> {
pub fn open(path: &Path, expected_meta: &schema::DirMeta) -> Result<Arc<SampleFileDir>, Error> {
let read_write = expected_meta.in_progress_open.is_some();
let s = SampleFileDir::open_self(path, false)?;
s.fd.lock(if read_write {
@ -220,7 +221,7 @@ impl SampleFileDir {
} else {
FlockArg::LockSharedNonblock
})
.map_err(|e| e.context(format!("unable to lock dir {}", path)))?;
.map_err(|e| e.context(format!("unable to lock dir {}", path.display())))?;
let dir_meta = read_meta(&s.fd).map_err(|e| e.context("unable to read meta file"))?;
if let Err(e) = SampleFileDir::check_consistent(expected_meta, &dir_meta) {
bail!(
@ -269,12 +270,12 @@ impl SampleFileDir {
}
pub(crate) fn create(
path: &str,
path: &Path,
db_meta: &schema::DirMeta,
) -> Result<Arc<SampleFileDir>, Error> {
let s = SampleFileDir::open_self(path, true)?;
s.fd.lock(FlockArg::LockExclusiveNonblock)
.map_err(|e| e.context(format!("unable to lock dir {}", path)))?;
.map_err(|e| e.context(format!("unable to lock dir {}", path.display())))?;
let old_meta = read_meta(&s.fd)?;
// Verify metadata. We only care that it hasn't been completely opened.
@ -282,12 +283,15 @@ impl SampleFileDir {
if old_meta.last_complete_open.is_some() {
bail!(
"Can't create dir at path {}: is already in use:\n{:?}",
path,
path.display(),
old_meta
);
}
if !s.is_empty()? {
bail!("Can't create dir at path {} with existing files", path);
bail!(
"Can't create dir at path {} with existing files",
path.display()
);
}
s.write_meta(db_meta)?;
Ok(s)
@ -316,7 +320,7 @@ impl SampleFileDir {
Ok(true)
}
fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> {
fn open_self(path: &Path, create: bool) -> Result<Arc<SampleFileDir>, Error> {
let fd = Arc::new(Fd::open(path, create)?);
let reader = reader::Reader::spawn(path, fd.clone());
Ok(Arc::new(SampleFileDir { fd, reader }))

View File

@ -23,6 +23,7 @@
use std::convert::TryFrom;
use std::future::Future;
use std::os::unix::prelude::AsRawFd;
use std::path::Path;
use std::{
ops::Range,
pin::Pin,
@ -44,7 +45,7 @@ use crate::CompositeId;
pub(super) struct Reader(tokio::sync::mpsc::UnboundedSender<ReaderCommand>);
impl Reader {
pub(super) fn spawn(path: &str, dir: Arc<super::Fd>) -> Self {
pub(super) fn spawn(path: &Path, dir: Arc<super::Fd>) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let page_size = usize::try_from(
nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
@ -54,7 +55,7 @@ impl Reader {
.expect("PAGE_SIZE fits in usize");
assert_eq!(page_size.count_ones(), 1, "invalid page size {}", page_size);
std::thread::Builder::new()
.name(format!("r-{}", path))
.name(format!("r-{}", path.display()))
.spawn(move || ReaderInt { dir, page_size }.run(rx))
.expect("unable to create reader thread");
Self(tx)
@ -407,7 +408,7 @@ mod tests {
.tempdir()
.unwrap();
let fd = std::sync::Arc::new(super::super::Fd::open(tmpdir.path(), false).unwrap());
let reader = super::Reader::spawn("/path/goes/here", fd);
let reader = super::Reader::spawn(tmpdir.path(), fd);
std::fs::write(tmpdir.path().join("0123456789abcdef"), b"blah blah").unwrap();
let f = reader.open_file(crate::CompositeId(0x01234567_89abcdef), 1..8);
assert_eq!(f.try_concat().await.unwrap(), b"lah bla");

View File

@ -31,7 +31,7 @@
//! so that they can be removed in a future version if they no longer make
//! sense. It also makes sense to avoid serializing them when empty.
use std::collections::BTreeMap;
use std::{collections::BTreeMap, path::PathBuf};
use rusqlite::types::{FromSqlError, ValueRef};
use serde::{Deserialize, Serialize};
@ -89,6 +89,17 @@ pub struct GlobalConfig {
}
sql!(GlobalConfig);
/// Sample file directory configuration, used in the `config` column of the `sample_file_dir` table.
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SampleFileDirConfig {
pub path: PathBuf,
#[serde(flatten)]
pub unknown: Map<String, Value>,
}
sql!(SampleFileDirConfig);
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SignalTypeConfig {

View File

@ -55,9 +55,11 @@ create table open (
create table sample_file_dir (
id integer primary key,
path text unique not null,
uuid blob unique not null check (length(uuid) = 16),
-- See json.SampleFileDirConfig.
config text,
-- The last (read/write) open of this directory which fully completed.
-- See schema.proto:DirMeta for a more complete description.
last_complete_open_id integer references open (id)

View File

@ -75,7 +75,7 @@ impl<C: Clocks + Clone> TestDb<C> {
db::init(&mut conn).unwrap();
let db = Arc::new(db::Database::new(clocks, conn, true).unwrap());
let (test_camera_uuid, sample_file_dir_id);
let path = tmpdir.path().to_str().unwrap().to_owned();
let path = tmpdir.path().to_owned();
let dir;
{
let mut l = db.lock();

View File

@ -1,5 +1,5 @@
// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2018 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
/// Upgrades a version 2 schema to a version 3 schema.
@ -10,7 +10,9 @@ use crate::dir;
use crate::schema;
use failure::Error;
use rusqlite::params;
use std::convert::TryFrom;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::sync::Arc;
/// Opens the sample file dir.
@ -48,6 +50,7 @@ fn open_sample_file_dir(tx: &rusqlite::Transaction) -> Result<Arc<dir::SampleFil
open.id = o_id as u32;
open.uuid.extend_from_slice(&o_uuid.0.as_bytes()[..]);
}
let p = PathBuf::try_from(p)?;
dir::SampleFileDir::open(&p, &meta)
}

View File

@ -6,12 +6,12 @@
use failure::{format_err, Error};
use fnv::FnvHashMap;
use rusqlite::{named_params, params};
use std::convert::TryFrom;
use std::{convert::TryFrom, path::PathBuf};
use url::Url;
use uuid::Uuid;
use crate::{
json::{CameraConfig, GlobalConfig, SignalConfig, SignalTypeConfig},
json::{CameraConfig, GlobalConfig, SampleFileDirConfig, SignalConfig, SignalTypeConfig},
SqlUuid,
};
@ -39,6 +39,36 @@ fn copy_meta(tx: &rusqlite::Transaction) -> Result<(), Error> {
Ok(())
}
fn copy_sample_file_dir(tx: &rusqlite::Transaction) -> Result<(), Error> {
let mut stmt =
tx.prepare("select id, uuid, path, last_complete_open_id from old_sample_file_dir")?;
let mut insert = tx.prepare(
r#"
insert into sample_file_dir (id, uuid, config, last_complete_open_id)
values (:id, :uuid, :config, :last_complete_open_id)
"#,
)?;
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let id: i32 = row.get(0)?;
let path: String = row.get(2)?;
let uuid: SqlUuid = row.get(1)?;
let config = SampleFileDirConfig {
path: PathBuf::try_from(path)?,
..Default::default()
};
let last_complete_open_id: Option<i64> = row.get(3)?;
insert.execute(named_params! {
":id": id,
":uuid": uuid,
":config": &config,
":last_complete_open_id": &last_complete_open_id,
})?;
}
Ok(())
}
fn copy_signal_types(tx: &rusqlite::Transaction) -> Result<(), Error> {
let mut types_ = FnvHashMap::default();
let mut stmt = tx.prepare("select type_uuid, value, name from signal_type_enum")?;
@ -262,6 +292,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
alter table camera rename to old_camera;
alter table stream rename to old_stream;
alter table signal rename to old_signal;
alter table sample_file_dir rename to old_sample_file_dir;
alter table meta rename to old_meta;
create table meta (
@ -269,6 +300,13 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
config text
);
create table sample_file_dir (
id integer primary key,
uuid blob unique not null check (length(uuid) = 16),
config text,
last_complete_open_id integer references open (id)
);
create table camera (
id integer primary key,
uuid blob unique not null check (length(uuid) = 16),
@ -303,6 +341,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
"#,
)?;
copy_meta(tx)?;
copy_sample_file_dir(tx)?;
copy_cameras(tx)?;
copy_signal_types(tx)?;
copy_signals(tx)?;
@ -374,6 +413,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
drop table old_recording;
drop table old_stream;
drop table old_camera;
drop table old_sample_file_dir;
drop table old_meta;
drop table old_signal;
drop table signal_type_enum;

View File

@ -17,6 +17,7 @@ use std::cmp::{self, Ordering};
use std::convert::TryFrom;
use std::io;
use std::mem;
use std::path::PathBuf;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration as StdDuration;
@ -178,7 +179,7 @@ where
Ok((
SyncerChannel(snd),
thread::Builder::new()
.name(format!("sync-{}", path))
.name(format!("sync-{}", path.display()))
.spawn(move || while syncer.iter(&rcv) {})
.unwrap(),
))
@ -312,7 +313,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
shutdown_rx: base::shutdown::Receiver,
db: Arc<db::Database<C>>,
dir_id: i32,
) -> Result<(Self, String), Error> {
) -> Result<(Self, PathBuf), Error> {
let d = l
.sample_file_dirs_by_id()
.get(&dir_id)

View File

@ -415,12 +415,12 @@ fn edit_camera_dialog(db: &Arc<db::Database>, siv: &mut Cursive, item: &Option<i
.min_height(3),
);
let dirs: Vec<_> = ::std::iter::once(("<none>".to_owned(), None))
let dirs: Vec<_> = ::std::iter::once(("<none>".into(), None))
.chain(
db.lock()
.sample_file_dirs_by_id()
.iter()
.map(|(&id, d)| (d.path.as_str().to_owned(), Some(id))),
.map(|(&id, d)| (d.path.to_owned(), Some(id))),
)
.collect();
for &type_ in &db::ALL_STREAM_TYPES {
@ -449,7 +449,7 @@ fn edit_camera_dialog(db: &Arc<db::Database>, siv: &mut Cursive, item: &Option<i
.child(
"sample file dir",
views::SelectView::<Option<i32>>::new()
.with_all(dirs.iter().cloned())
.with_all(dirs.iter().map(|(p, id)| (p.display().to_string(), *id)))
.popup()
.with_name(format!("{}_sample_file_dir", type_.as_str())),
)

View File

@ -11,6 +11,7 @@ use failure::Error;
use log::{debug, trace};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::path::Path;
use std::rc::Rc;
use std::sync::Arc;
@ -206,7 +207,7 @@ pub fn top_dialog(db: &Arc<db::Database>, siv: &mut Cursive) {
db.lock()
.sample_file_dirs_by_id()
.iter()
.map(|(&id, d)| (d.path.to_string(), Some(id))),
.map(|(&id, d)| (d.path.display().to_string(), Some(id))),
)
.full_width(),
)
@ -224,7 +225,7 @@ fn add_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive) {
views::EditView::new()
.on_submit({
let db = db.clone();
move |siv, path| add_dir(&db, siv, path)
move |siv, path| add_dir(&db, siv, path.as_ref())
})
.with_name("path")
.fixed_width(60),
@ -237,7 +238,7 @@ fn add_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive) {
.find_name::<views::EditView>("path")
.unwrap()
.get_content();
add_dir(&db, siv, &path)
add_dir(&db, siv, path.as_ref().as_ref())
}
})
.button("Cancel", |siv| {
@ -247,10 +248,10 @@ fn add_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive) {
);
}
fn add_dir(db: &Arc<db::Database>, siv: &mut Cursive, path: &str) {
fn add_dir(db: &Arc<db::Database>, siv: &mut Cursive, path: &Path) {
if let Err(e) = db.lock().add_sample_file_dir(path.to_owned()) {
siv.add_layer(
views::Dialog::text(format!("Unable to add path {}: {}", path, e))
views::Dialog::text(format!("Unable to add path {}: {}", path.display(), e))
.dismiss_button("Back")
.title("Error"),
);
@ -426,6 +427,6 @@ fn edit_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive, dir_id: i32) {
.child(views::DummyView)
.child(buttons),
)
.title(format!("Edit retention for {}", path)),
.title(format!("Edit retention for {}", path.display())),
);
}

View File

@ -277,7 +277,7 @@ async fn inner(args: Args, shutdown_rx: base::shutdown::Receiver) -> Result<i32,
if let Some(id) = stream.sample_file_dir_id {
dirs.entry(id).or_insert_with(|| {
let d = l.sample_file_dirs_by_id().get(&id).unwrap();
info!("Starting syncer for path {}", d.path);
info!("Starting syncer for path {}", d.path.display());
d.get().unwrap()
});
} else {