mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-01-23 04:33:16 -05:00
s/std::fs::read_dir/nix::dir::Dir/ in a few spots
This is nicer in a few ways: * I can use openat so there's no possibility of any kind of a race involving scanning a different directory than the one used in other ways (locking, metadata file, adding/removing sample files) * filename() doesn't need to allocate memory, so it's a bit more efficient * dogfooding - I wrote nix::dir.
This commit is contained in:
parent
bb227491b6
commit
e52e725958
25
db/check.rs
25
db/check.rs
@ -38,11 +38,11 @@ use crate::recording;
|
||||
use failure::Error;
|
||||
use fnv::FnvHashMap;
|
||||
use log::error;
|
||||
use nix::fcntl::AtFlags;
|
||||
use protobuf::prelude::MessageField;
|
||||
use rusqlite::types::ToSql;
|
||||
use crate::schema;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::fs;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
pub struct Options {
|
||||
pub compare_lens: bool,
|
||||
@ -86,8 +86,8 @@ pub fn run(conn: &rusqlite::Connection, opts: &Options) -> Result<(), Error> {
|
||||
}
|
||||
|
||||
// Open the directory (checking its metadata) and hold it open (for the lock).
|
||||
let _dir = dir::SampleFileDir::open(&dir_path, &meta)?;
|
||||
let mut streams = read_dir(&dir_path, opts)?;
|
||||
let dir = dir::SampleFileDir::open(&dir_path, &meta)?;
|
||||
let mut streams = read_dir(&dir, opts)?;
|
||||
let mut rows = garbage_stmt.query(&[&dir_id])?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id = CompositeId(row.get(0)?);
|
||||
@ -186,24 +186,27 @@ fn summarize_index(video_index: &[u8]) -> Result<RecordingSummary, Error> {
|
||||
/// Reads through the given sample file directory.
|
||||
/// Logs unexpected files and creates a hash map of the files found there.
|
||||
/// If `opts.compare_lens` is set, the values are lengths; otherwise they're insignificant.
|
||||
fn read_dir(path: &str, opts: &Options) -> Result<Dir, Error> {
|
||||
fn read_dir(d: &dir::SampleFileDir, opts: &Options) -> Result<Dir, Error> {
|
||||
let mut dir = Dir::default();
|
||||
for e in fs::read_dir(path)? {
|
||||
let mut d = d.opendir()?;
|
||||
let fd = d.as_raw_fd();
|
||||
for e in d.iter() {
|
||||
let e = e?;
|
||||
let f = e.file_name();
|
||||
let f = f.as_bytes();
|
||||
match f {
|
||||
b"meta" => continue,
|
||||
match f.to_bytes() {
|
||||
b"." | b".." | b"meta" => continue,
|
||||
_ => {},
|
||||
};
|
||||
let id = match dir::parse_id(f) {
|
||||
let id = match dir::parse_id(f.to_bytes()) {
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
error!("sample file directory contains file {:?} which isn't an id", f);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let len = if opts.compare_lens { e.metadata()?.len() } else { 0 };
|
||||
let len = if opts.compare_lens {
|
||||
nix::sys::stat::fstatat(fd, f, AtFlags::empty())?.st_size as u64
|
||||
} else { 0 };
|
||||
let stream = dir.entry(id.stream()).or_insert_with(Stream::default);
|
||||
stream.entry(id.recording()).or_insert_with(Recording::default).file = Some(len);
|
||||
}
|
||||
|
2
db/db.rs
2
db/db.rs
@ -1590,7 +1590,7 @@ impl LockedDatabase {
|
||||
_ => arc,
|
||||
},
|
||||
};
|
||||
if !dir::SampleFileDir::is_empty(&d.get().path)? {
|
||||
if !dir.is_empty()? {
|
||||
bail!("Can't delete sample file directory {} which still has files", &d.get().path);
|
||||
}
|
||||
let mut meta = d.get().meta(&self.uuid);
|
||||
|
42
db/dir.rs
42
db/dir.rs
@ -44,8 +44,7 @@ use nix::sys::statvfs::Statvfs;
|
||||
use std::ffi::{CStr, CString};
|
||||
use std::fs;
|
||||
use std::io::{Read, Write};
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::os::unix::io::FromRawFd;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// The fixed length of a directory's `meta` file.
|
||||
@ -121,13 +120,6 @@ impl Fd {
|
||||
nix::unistd::fsync(self.0)
|
||||
}
|
||||
|
||||
/// Opens a sample file within this directory with the given flags and (if creating) mode.
|
||||
pub(crate) fn openat<P: ?Sized + NixPath>(&self, p: &P, oflag: OFlag, mode: Mode)
|
||||
-> Result<fs::File, nix::Error> {
|
||||
let fd = nix::fcntl::openat(self.0, p, oflag, mode)?;
|
||||
Ok(unsafe { fs::File::from_raw_fd(fd) })
|
||||
}
|
||||
|
||||
/// Locks the directory with the specified `flock` operation.
|
||||
pub fn lock(&self, arg: FlockArg) -> Result<(), nix::Error> {
|
||||
nix::fcntl::flock(self.0, arg)
|
||||
@ -141,7 +133,7 @@ impl Fd {
|
||||
/// Reads `dir`'s metadata. If none is found, returns an empty proto.
|
||||
pub(crate) fn read_meta(dir: &Fd) -> Result<schema::DirMeta, Error> {
|
||||
let mut meta = schema::DirMeta::default();
|
||||
let mut f = match dir.openat(cstr!("meta"), OFlag::O_RDONLY, Mode::empty()) {
|
||||
let mut f = match crate::fs::openat(dir.0, cstr!("meta"), OFlag::O_RDONLY, Mode::empty()) {
|
||||
Err(e) => {
|
||||
if e == nix::Error::Sys(nix::errno::Errno::ENOENT) {
|
||||
return Ok(meta);
|
||||
@ -165,21 +157,21 @@ pub(crate) fn read_meta(dir: &Fd) -> Result<schema::DirMeta, Error> {
|
||||
}
|
||||
|
||||
/// Write `dir`'s metadata, clobbering existing data.
|
||||
pub(crate) fn write_meta(dir: &Fd, meta: &schema::DirMeta) -> Result<(), Error> {
|
||||
pub(crate) fn write_meta(dirfd: RawFd, meta: &schema::DirMeta) -> Result<(), Error> {
|
||||
let mut data = meta.write_length_delimited_to_bytes().expect("proto3->vec is infallible");
|
||||
if data.len() > FIXED_DIR_META_LEN {
|
||||
bail!("Length-delimited DirMeta message requires {} bytes, over limit of {}",
|
||||
data.len(), FIXED_DIR_META_LEN);
|
||||
}
|
||||
data.resize(FIXED_DIR_META_LEN, 0); // pad to required length.
|
||||
let mut f = dir.openat(cstr!("meta"), OFlag::O_CREAT | OFlag::O_WRONLY,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR)?;
|
||||
let mut f = crate::fs::openat(dirfd, cstr!("meta"), OFlag::O_CREAT | OFlag::O_WRONLY,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR)?;
|
||||
let stat = f.metadata()?;
|
||||
if stat.len() == 0 {
|
||||
// Need to sync not only the data but also the file metadata and dirent.
|
||||
f.write_all(&data)?;
|
||||
f.sync_all()?;
|
||||
dir.sync()?;
|
||||
nix::unistd::fsync(dirfd)?;
|
||||
} else if stat.len() == FIXED_DIR_META_LEN as u64 {
|
||||
// Just syncing the data will suffice; existing metadata and dirent are fine.
|
||||
f.write_all(&data)?;
|
||||
@ -247,18 +239,24 @@ impl SampleFileDir {
|
||||
if old_meta.last_complete_open.is_some() {
|
||||
bail!("Can't create dir at path {}: is already in use:\n{:?}", path, old_meta);
|
||||
}
|
||||
if !SampleFileDir::is_empty(path)? {
|
||||
if !s.is_empty()? {
|
||||
bail!("Can't create dir at path {} with existing files", path);
|
||||
}
|
||||
s.write_meta(db_meta)?;
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
pub(crate) fn opendir(&self) -> Result<nix::dir::Dir, nix::Error> {
|
||||
nix::dir::Dir::openat(self.fd.as_raw_fd(), ".", OFlag::O_DIRECTORY | OFlag::O_RDONLY,
|
||||
Mode::empty())
|
||||
}
|
||||
|
||||
/// Determines if the directory is empty, aside form metadata.
|
||||
pub(crate) fn is_empty(path: &str) -> Result<bool, Error> {
|
||||
for e in fs::read_dir(path)? {
|
||||
pub(crate) fn is_empty(&self) -> Result<bool, Error> {
|
||||
let mut dir = self.opendir()?;
|
||||
for e in dir.iter() {
|
||||
let e = e?;
|
||||
match e.file_name().as_bytes() {
|
||||
match e.file_name().to_bytes() {
|
||||
b"." | b".." => continue,
|
||||
b"meta" => continue, // existing metadata is fine.
|
||||
_ => return Ok(false),
|
||||
@ -278,17 +276,17 @@ impl SampleFileDir {
|
||||
/// Opens the given sample file for reading.
|
||||
pub fn open_file(&self, composite_id: CompositeId) -> Result<fs::File, nix::Error> {
|
||||
let p = CompositeIdPath::from(composite_id);
|
||||
self.fd.openat(&p, OFlag::O_RDONLY, Mode::empty())
|
||||
crate::fs::openat(self.fd.0, &p, OFlag::O_RDONLY, Mode::empty())
|
||||
}
|
||||
|
||||
pub fn create_file(&self, composite_id: CompositeId) -> Result<fs::File, nix::Error> {
|
||||
let p = CompositeIdPath::from(composite_id);
|
||||
self.fd.openat(&p, OFlag::O_WRONLY | OFlag::O_EXCL | OFlag::O_CREAT,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR)
|
||||
crate::fs::openat(self.fd.0, &p, OFlag::O_WRONLY | OFlag::O_EXCL | OFlag::O_CREAT,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR)
|
||||
}
|
||||
|
||||
pub(crate) fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> {
|
||||
write_meta(&self.fd, meta)
|
||||
write_meta(self.fd.0, meta)
|
||||
}
|
||||
|
||||
pub fn statfs(&self) -> Result<Statvfs, nix::Error> { self.fd.statfs() }
|
||||
|
40
db/fs.rs
Normal file
40
db/fs.rs
Normal file
@ -0,0 +1,40 @@
|
||||
// This file is part of Moonfire NVR, a security camera network video recorder.
|
||||
// Copyright (C) 2019 Scott Lamb <slamb@slamb.org>
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// In addition, as a special exception, the copyright holders give
|
||||
// permission to link the code of portions of this program with the
|
||||
// OpenSSL library under certain conditions as described in each
|
||||
// individual source file, and distribute linked combinations including
|
||||
// the two.
|
||||
//
|
||||
// You must obey the GNU General Public License in all respects for all
|
||||
// of the code used other than OpenSSL. If you modify file(s) with this
|
||||
// exception, you may extend this exception to your version of the
|
||||
// file(s), but you are not obligated to do so. If you do not wish to do
|
||||
// so, delete this exception statement from your version. If you delete
|
||||
// this exception statement from all source files in the program, then
|
||||
// also delete it here.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::os::unix::io::{FromRawFd, RawFd};
|
||||
use nix::NixPath;
|
||||
use nix::fcntl::OFlag;
|
||||
use nix::sys::stat::Mode;
|
||||
|
||||
pub fn openat<P: ?Sized + NixPath>(dirfd: RawFd, path: &P, oflag: OFlag, mode: Mode)
|
||||
-> Result<std::fs::File, nix::Error> {
|
||||
let fd = nix::fcntl::openat(dirfd, path, oflag, mode)?;
|
||||
Ok(unsafe { std::fs::File::from_raw_fd(fd) })
|
||||
}
|
@ -36,6 +36,7 @@ mod coding;
|
||||
mod compare;
|
||||
pub mod db;
|
||||
pub mod dir;
|
||||
mod fs;
|
||||
mod raw;
|
||||
pub mod recording;
|
||||
mod schema;
|
||||
|
@ -32,12 +32,12 @@
|
||||
|
||||
use crate::dir;
|
||||
use failure::{Error, bail, format_err};
|
||||
use nix::fcntl::FlockArg;
|
||||
use nix::fcntl::{FlockArg, OFlag};
|
||||
use nix::sys::stat::Mode;
|
||||
use protobuf::prelude::MessageField;
|
||||
use rusqlite::types::ToSql;
|
||||
use crate::schema::DirMeta;
|
||||
use std::fs;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> {
|
||||
@ -46,9 +46,10 @@ pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
.ok_or_else(|| format_err!("--sample-file-dir required when upgrading from \
|
||||
schema version 1 to 2."))?;
|
||||
|
||||
let d = dir::Fd::open(sample_file_path, false)?;
|
||||
d.lock(FlockArg::LockExclusiveNonblock)?;
|
||||
verify_dir_contents(sample_file_path, tx)?;
|
||||
let mut d = nix::dir::Dir::open(sample_file_path, OFlag::O_DIRECTORY | OFlag::O_RDONLY,
|
||||
Mode::empty())?;
|
||||
nix::fcntl::flock(d.as_raw_fd(), FlockArg::LockExclusiveNonblock)?;
|
||||
verify_dir_contents(sample_file_path, &mut d, tx)?;
|
||||
|
||||
// These create statements match the schema.sql when version 2 was the latest.
|
||||
tx.execute_batch(r#"
|
||||
@ -119,7 +120,7 @@ pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
open.id = open_id;
|
||||
open.uuid.extend_from_slice(&open_uuid_bytes);
|
||||
}
|
||||
dir::write_meta(&d, &meta)?;
|
||||
dir::write_meta(d.as_raw_fd(), &meta)?;
|
||||
|
||||
tx.execute(r#"
|
||||
insert into sample_file_dir (path, uuid, last_complete_open_id)
|
||||
@ -292,7 +293,8 @@ pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
/// * optional: reserved sample file uuids.
|
||||
/// * optional: meta and meta-tmp from half-completed update attempts.
|
||||
/// * forbidden: anything else.
|
||||
fn verify_dir_contents(sample_file_path: &str, tx: &rusqlite::Transaction) -> Result<(), Error> {
|
||||
fn verify_dir_contents(sample_file_path: &str, dir: &mut nix::dir::Dir,
|
||||
tx: &rusqlite::Transaction) -> Result<(), Error> {
|
||||
// Build a hash of the uuids found in the directory.
|
||||
let n: i64 = tx.query_row(r#"
|
||||
select
|
||||
@ -302,10 +304,10 @@ fn verify_dir_contents(sample_file_path: &str, tx: &rusqlite::Transaction) -> Re
|
||||
(select count(*) as c from reserved_sample_files) b;
|
||||
"#, &[] as &[&dyn ToSql], |r| r.get(0))?;
|
||||
let mut files = ::fnv::FnvHashSet::with_capacity_and_hasher(n as usize, Default::default());
|
||||
for e in fs::read_dir(sample_file_path)? {
|
||||
for e in dir.iter() {
|
||||
let e = e?;
|
||||
let f = e.file_name();
|
||||
match f.as_bytes() {
|
||||
match f.to_bytes() {
|
||||
b"." | b".." => continue,
|
||||
b"meta" | b"meta-tmp" => {
|
||||
// Ignore metadata files. These might from a half-finished update attempt.
|
||||
@ -315,8 +317,8 @@ fn verify_dir_contents(sample_file_path: &str, tx: &rusqlite::Transaction) -> Re
|
||||
_ => {},
|
||||
};
|
||||
let s = match f.to_str() {
|
||||
Some(s) => s,
|
||||
None => bail!("unexpected file {:?} in {:?}", f, sample_file_path),
|
||||
Ok(s) => s,
|
||||
Err(_) => bail!("unexpected file {:?} in {:?}", f, sample_file_path),
|
||||
};
|
||||
let uuid = match Uuid::parse_str(s) {
|
||||
Ok(u) => u,
|
||||
|
@ -82,7 +82,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
dir.lock(FlockArg::LockExclusiveNonblock)?;
|
||||
let tmp_path = cstr!("meta.tmp");
|
||||
let path = cstr!("meta");
|
||||
let mut f = dir.openat(path, OFlag::O_RDONLY, Mode::empty())?;
|
||||
let mut f = crate::fs::openat(dir.as_raw_fd(), path, OFlag::O_RDONLY, Mode::empty())?;
|
||||
let mut data = Vec::new();
|
||||
f.read_to_end(&mut data)?;
|
||||
if data.len() == FIXED_DIR_META_LEN {
|
||||
@ -95,8 +95,9 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
if !dir::SampleFileDir::consistent(&db_meta, &dir_meta) {
|
||||
bail!("Inconsistent db_meta={:?} dir_meta={:?}", &db_meta, &dir_meta);
|
||||
}
|
||||
let mut f = dir.openat(tmp_path, OFlag::O_CREAT | OFlag::O_TRUNC | OFlag::O_WRONLY,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR)?;
|
||||
let mut f = crate::fs::openat(dir.as_raw_fd(), tmp_path,
|
||||
OFlag::O_CREAT | OFlag::O_TRUNC | OFlag::O_WRONLY,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR)?;
|
||||
let mut data =
|
||||
dir_meta.write_length_delimited_to_bytes().expect("proto3->vec is infallible");
|
||||
if data.len() > FIXED_DIR_META_LEN {
|
||||
|
10
db/writer.rs
10
db/writer.rs
@ -45,7 +45,6 @@ use std::cmp::Ordering;
|
||||
use std::cmp;
|
||||
use std::io;
|
||||
use std::mem;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
@ -263,12 +262,13 @@ impl<F: FileWriter> SyncerChannel<F> {
|
||||
|
||||
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
|
||||
/// on opening.
|
||||
fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap<i32, i32>)
|
||||
fn list_files_to_abandon(dir: &dir::SampleFileDir, streams_to_next: FnvHashMap<i32, i32>)
|
||||
-> Result<Vec<CompositeId>, Error> {
|
||||
let mut v = Vec::new();
|
||||
for e in ::std::fs::read_dir(path)? {
|
||||
let mut d = dir.opendir()?;
|
||||
for e in d.iter() {
|
||||
let e = e?;
|
||||
let id = match dir::parse_id(e.file_name().as_bytes()) {
|
||||
let id = match dir::parse_id(e.file_name().to_bytes()) {
|
||||
Ok(i) => i,
|
||||
Err(_) => continue,
|
||||
};
|
||||
@ -304,7 +304,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let to_abandon = list_files_to_abandon(&d.path, streams_to_next)?;
|
||||
let to_abandon = list_files_to_abandon(&dir, streams_to_next)?;
|
||||
let mut undeletable = 0;
|
||||
for &id in &to_abandon {
|
||||
if let Err(e) = dir.unlink_file(id) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user