use xsv-style subcommands like "moonfire-nvr run"

This makes it easier to understand which options are valid with each
command.

Additionally, there's more separation of implementations. The most
obvious consequence is that "moonfire-nvr ts ..." no longer uselessly
locks/opens a database.
This commit is contained in:
Scott Lamb 2017-01-16 12:50:47 -08:00
parent a6ec68027a
commit 3af9aeee96
9 changed files with 380 additions and 154 deletions

View File

@ -268,7 +268,7 @@ been done for you. If not, Create
After=network-online.target
[Service]
ExecStart=/usr/local/bin/moonfire-nvr \
ExecStart=/usr/local/bin/moonfire-nvr run \
--sample-file-dir=/var/lib/moonfire-nvr/sample \
--db-dir=/var/lib/moonfire-nvr/db \
--http-addr=0.0.0.0:8080

View File

@ -231,7 +231,7 @@ Description=${SERVICE_DESC}
After=network-online.target
[Service]
ExecStart=${SERVICE_BIN} \\
ExecStart=${SERVICE_BIN} run \\
--sample-file-dir=${SAMPLES_PATH} \\
--db-dir=${DB_DIR} \\
--http-addr=0.0.0.0:${NVR_PORT}

View File

@ -33,10 +33,33 @@
use db;
use error::Error;
use recording;
use rusqlite;
use std::fs;
use uuid::Uuid;
static USAGE: &'static str = r#"
Checks database integrity.
Usage:
moonfire-nvr check [options]
moonfire-nvr check --help
Options:
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
"#;
#[derive(Debug, RustcDecodable)]
struct Args {
flag_db_dir: String,
flag_sample_file_dir: String,
}
#[derive(Debug, Eq, PartialEq)]
struct RecordingSummary {
bytes: u64,
@ -73,9 +96,12 @@ struct File {
composite_id: Option<i64>,
}
pub fn run(conn: rusqlite::Connection, sample_file_dir: &str) -> Result<(), Error> {
pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
super::install_logger(false);
let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, true)?;
let mut files = Vec::new();
for e in fs::read_dir(sample_file_dir)? {
for e in fs::read_dir(&args.flag_sample_file_dir)? {
let e = e?;
let uuid = match e.file_name().to_str().and_then(|f| Uuid::parse_str(f).ok()) {
Some(f) => f,

102
src/cmds/mod.rs Normal file
View File

@ -0,0 +1,102 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use dir;
use docopt;
use error::Error;
use libc;
use rusqlite;
use slog::{self, DrainExt};
use slog_envlogger;
use slog_stdlog;
use slog_term;
use std::path::Path;
mod check;
mod run;
mod ts;
mod upgrade;
#[derive(Debug, RustcDecodable)]
pub enum Command {
Run,
Upgrade,
Check,
Ts,
}
impl Command {
pub fn run(&self) -> Result<(), Error> {
match *self {
Command::Run => run::run(),
Command::Upgrade => upgrade::run(),
Command::Check => check::run(),
Command::Ts => ts::run(),
}
}
}
/// Initializes logging.
/// `async` should be true only for serving; otherwise logging can block useful work.
/// Sync logging should be preferred for other modes because async apparently is never flushed
/// before the program exits, and partial output from these tools is very confusing.
fn install_logger(async: bool) {
let drain = slog_term::StreamerBuilder::new();
let drain = slog_envlogger::new(if async { drain.async() } else { drain }.full().build());
slog_stdlog::set_logger(slog::Logger::root(drain.ignore_err(), None)).unwrap();
}
/// Locks and opens the database.
/// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is.
fn open_conn(db_dir: &str, read_only: bool) -> Result<(dir::Fd, rusqlite::Connection), Error> {
let dir = dir::Fd::open(db_dir)?;
dir.lock(if read_only { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB)
.map_err(|e| Error{description: format!("db dir {:?} already in use; can't get {} lock",
db_dir,
if read_only { "shared" } else { "exclusive" }),
cause: Some(Box::new(e))})?;
let conn = rusqlite::Connection::open_with_flags(
Path::new(&db_dir).join("db"),
if read_only {
rusqlite::SQLITE_OPEN_READ_ONLY
} else {
rusqlite::SQLITE_OPEN_READ_WRITE
} |
// rusqlite::Connection is not Sync, so there's no reason to tell SQLite3 to use the
// serialized threading mode.
rusqlite::SQLITE_OPEN_NO_MUTEX)?;
Ok((dir, conn))
}
fn parse_args<T>(usage: &str) -> Result<T, Error> where T: ::rustc_serialize::Decodable {
Ok(docopt::Docopt::new(usage)
.and_then(|d| d.decode())
.unwrap_or_else(|e| e.exit()))
}

129
src/cmds/run.rs Normal file
View File

@ -0,0 +1,129 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use chan_signal;
use clock;
use db;
use dir;
use error::Error;
use hyper::server::Server;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use stream;
use streamer;
use web;
const USAGE: &'static str = r#"
Usage: moonfire-nvr run [options]
Options:
-h, --help Show this message.
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
--http-addr=ADDR Set the bind address for the unencrypted HTTP server.
[default: 0.0.0.0:8080]
--read-only Forces read-only mode / disables recording.
"#;
#[derive(Debug, RustcDecodable)]
struct Args {
flag_db_dir: String,
flag_sample_file_dir: String,
flag_http_addr: String,
flag_read_only: bool,
}
pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
// Watch for termination signals.
// This must be started before any threads are spawned (such as the async logger thread) so
// that signals will be blocked in all threads.
let signal = chan_signal::notify(&[chan_signal::Signal::INT, chan_signal::Signal::TERM]);
super::install_logger(true);
let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, args.flag_read_only)?;
let db = Arc::new(db::Database::new(conn).unwrap());
let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap();
info!("Database is loaded.");
// Start a streamer for each camera.
let shutdown = Arc::new(AtomicBool::new(false));
let mut streamers = Vec::new();
let syncer = if !args.flag_read_only {
let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap();
let l = db.lock();
let cameras = l.cameras_by_id().len();
let env = streamer::Environment{
db: &db,
dir: &dir,
clocks: &clock::REAL,
opener: &*stream::FFMPEG,
shutdown: &shutdown,
};
for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() {
let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64;
let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera,
rotate_offset_sec,
streamer::ROTATE_INTERVAL_SEC);
let name = format!("stream-{}", streamer.short_name());
streamers.push(thread::Builder::new().name(name).spawn(move|| {
streamer.run();
}).expect("can't create thread"));
}
Some((syncer_channel, syncer_join))
} else { None };
// Start the web interface.
let server = Server::http(args.flag_http_addr.as_str()).unwrap();
let h = web::Handler::new(db.clone(), dir.clone());
let _guard = server.handle(h);
info!("Ready to serve HTTP requests");
// Wait for a signal and shut down.
chan_select! {
signal.recv() -> signal => info!("Received signal {:?}; shutting down streamers.", signal),
}
shutdown.store(true, Ordering::SeqCst);
for streamer in streamers.drain(..) {
streamer.join().unwrap();
}
if let Some((syncer_channel, syncer_join)) = syncer {
info!("Shutting down syncer.");
drop(syncer_channel);
syncer_join.join().unwrap();
}
info!("Exiting.");
::std::process::exit(0);
}

52
src/cmds/ts.rs Normal file
View File

@ -0,0 +1,52 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use error::Error;
use recording;
const USAGE: &'static str = r#"
Usage: moonfire-nvr ts <ts>...
moonfire-nvr ts --help
"#;
#[derive(Debug, RustcDecodable)]
struct Args {
arg_ts: Vec<String>,
}
pub fn run() -> Result<(), Error> {
let arg: Args = super::parse_args(&USAGE)?;
super::install_logger(false);
for timestamp in &arg.arg_ts {
let t = recording::Time::parse(timestamp)?;
println!("{} == {}", t, t.0);
}
Ok(())
}

View File

@ -38,6 +38,29 @@ use rusqlite;
mod v0_to_v1;
const USAGE: &'static str = r#"
Upgrade to the latest database schema.
Usage: moonfire-nvr upgrade [options]
Options:
-h, --help Show this message.
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
--preset-journal=MODE Resets the SQLite journal_mode to the specified mode
prior to the upgrade. The default, delete, is
recommended. off is very dangerous but may be
desirable in some circumstances. See guide/schema.md
for more information. The journal mode will be reset
to wal after the upgrade.
[default: delete]
--no-vacuum Skips the normal post-upgrade vacuum operation.
"#;
const UPGRADE_NOTES: &'static str =
concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION"));
@ -45,6 +68,14 @@ const UPGRADERS: [fn(&rusqlite::Transaction) -> Result<(), Error>; 1] = [
v0_to_v1::run,
];
#[derive(Debug, RustcDecodable)]
struct Args {
flag_db_dir: String,
flag_sample_file_dir: String,
flag_preset_journal: String,
flag_no_vacuum: bool,
}
fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(), Error> {
assert!(!requested.contains(';')); // quick check for accidental sql injection.
let actual = conn.query_row(&format!("pragma journal_mode = {}", requested), &[],
@ -53,8 +84,11 @@ fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(),
Ok(())
}
pub fn run(mut conn: rusqlite::Connection, preset_journal: &str,
no_vacuum: bool) -> Result<(), Error> {
pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
super::install_logger(false);
let (_db_dir, mut conn) = super::open_conn(&args.flag_db_dir, false)?;
{
assert_eq!(UPGRADERS.len(), db::EXPECTED_VERSION as usize);
let old_ver =
@ -66,7 +100,7 @@ pub fn run(mut conn: rusqlite::Connection, preset_journal: &str,
return Err(Error::new(format!("Database is at negative version {}!", old_ver)));
}
info!("Upgrading database from version {} to version {}...", old_ver, db::EXPECTED_VERSION);
set_journal_mode(&conn, preset_journal).unwrap();
set_journal_mode(&conn, &args.flag_preset_journal).unwrap();
for ver in old_ver .. db::EXPECTED_VERSION {
info!("...from version {} to version {}", ver, ver + 1);
let tx = conn.transaction()?;
@ -82,7 +116,7 @@ pub fn run(mut conn: rusqlite::Connection, preset_journal: &str,
// WAL is the preferred journal mode for normal operation; it reduces the number of syncs
// without compromising safety.
set_journal_mode(&conn, "wal").unwrap();
if !no_vacuum {
if !args.flag_no_vacuum {
info!("...vacuuming database after upgrade.");
conn.execute_batch(r#"
pragma page_size = 16384;

View File

@ -61,16 +61,9 @@ extern crate time;
extern crate url;
extern crate uuid;
use hyper::server::Server;
use slog::DrainExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
mod check;
mod clock;
mod coding;
mod cmds;
mod db;
mod dir;
mod error;
@ -83,164 +76,54 @@ mod stream;
mod streamer;
mod strutil;
#[cfg(test)] mod testutil;
mod upgrade;
mod web;
/// Commandline usage string. This is in the particular format expected by the `docopt` crate.
/// Besides being printed on --help or argument parsing error, it's actually parsed to define the
/// allowed commandline arguments and their defaults.
const USAGE: &'static str = "
Usage: moonfire-nvr [options]
moonfire-nvr --upgrade [options]
moonfire-nvr --check [options]
moonfire-nvr --ts <ts>...
Usage: moonfire-nvr <command> [<args>...]
moonfire-nvr (--help | --version)
Options:
-h, --help Show this message.
--version Show the version of moonfire-nvr.
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
--http-addr=ADDR Set the bind address for the unencrypted HTTP server.
[default: 0.0.0.0:8080]
--read-only Forces read-only mode / disables recording.
--preset-journal=MODE With --upgrade, resets the SQLite journal_mode to
the specified mode prior to the upgrade. The default,
delete, is recommended. off is very dangerous but
may be desirable in some circumstances. See
guide/schema.md for more information. The journal
mode will be reset to wal after the upgrade.
[default: delete]
--no-vacuum With --upgrade, skips the normal post-upgrade vacuum
operation.
Commands:
run Run the daemon: record from cameras and handle HTTP requests
upgrade Upgrade the database to the latest schema
check Check database integrity
ts Translate between human-readable and numeric timestamps
";
/// Commandline arguments corresponding to `USAGE`; automatically filled by the `docopt` crate.
#[derive(RustcDecodable)]
#[derive(Debug, RustcDecodable)]
struct Args {
flag_db_dir: String,
flag_sample_file_dir: String,
flag_http_addr: String,
flag_read_only: bool,
flag_check: bool,
flag_upgrade: bool,
flag_ts: bool,
flag_no_vacuum: bool,
flag_preset_journal: String,
arg_ts: Vec<String>,
arg_command: Option<cmds::Command>,
}
fn version() -> String {
let major = option_env!("CARGO_PKG_VERSION_MAJOR");
let minor = option_env!("CARGO_PKG_VERSION_MAJOR");
let patch = option_env!("CARGO_PKG_VERSION_MAJOR");
match (major, minor, patch) {
(Some(major), Some(minor), Some(patch)) => format!("{}.{}.{}", major, minor, patch),
_ => "".to_owned(),
}
}
fn main() {
// Parse commandline arguments.
let version = "Moonfire NVR 0.1.0".to_owned();
// (Note this differs from cmds::parse_args in that it specifies options_first.)
let args: Args = docopt::Docopt::new(USAGE)
.and_then(|d| d.version(Some(version)).decode())
.and_then(|d| d.options_first(true)
.version(Some(version()))
.decode())
.unwrap_or_else(|e| e.exit());
// Watch for termination signals.
// This must be started before any threads are spawned (such as the async logger thread) so
// that signals will be blocked in all threads.
let signal = chan_signal::notify(&[chan_signal::Signal::INT, chan_signal::Signal::TERM]);
// Initialize logging.
// Use async logging for serving because otherwise it blocks useful work.
// Use sync logging for other modes because async apparently is never flushed before the
// program exits, and partial output from these tools is very confusing.
let drain = slog_term::StreamerBuilder::new();
let drain = slog_envlogger::new(if args.flag_upgrade || args.flag_check { drain }
else { drain.async() }.full().build());
slog_stdlog::set_logger(slog::Logger::root(drain.ignore_err(), None)).unwrap();
// Open the database and populate cached state.
let db_dir = dir::Fd::open(&args.flag_db_dir).unwrap();
db_dir.lock(if args.flag_read_only { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB)
.unwrap();
let conn = rusqlite::Connection::open_with_flags(
Path::new(&args.flag_db_dir).join("db"),
if args.flag_read_only {
rusqlite::SQLITE_OPEN_READ_ONLY
} else {
rusqlite::SQLITE_OPEN_READ_WRITE
} |
// rusqlite::Connection is not Sync, so there's no reason to tell SQLite3 to use the
// serialized threading mode.
rusqlite::SQLITE_OPEN_NO_MUTEX).unwrap();
if args.flag_upgrade {
upgrade::run(conn, &args.flag_preset_journal, args.flag_no_vacuum).unwrap();
} else if args.flag_check {
check::run(conn, &args.flag_sample_file_dir).unwrap();
} else if args.flag_ts {
run_ts(args.arg_ts).unwrap();
} else {
run(args, conn, &signal);
if let Err(e) = args.arg_command.unwrap().run() {
use std::io::Write;
writeln!(&mut ::std::io::stderr(), "{}", e).unwrap();
::std::process::exit(1);
}
}
fn run_ts(timestamps: Vec<String>) -> Result<(), error::Error> {
for timestamp in &timestamps {
let t = recording::Time::parse(timestamp)?;
println!("{} == {}", t, t.0);
}
Ok(())
}
fn run(args: Args, conn: rusqlite::Connection, signal: &chan::Receiver<chan_signal::Signal>) {
let db = Arc::new(db::Database::new(conn).unwrap());
let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap();
info!("Database is loaded.");
// Start a streamer for each camera.
let shutdown = Arc::new(AtomicBool::new(false));
let mut streamers = Vec::new();
let syncer = if !args.flag_read_only {
let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap();
let l = db.lock();
let cameras = l.cameras_by_id().len();
let env = streamer::Environment{
db: &db,
dir: &dir,
clocks: &clock::REAL,
opener: &*stream::FFMPEG,
shutdown: &shutdown,
};
for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() {
let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64;
let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera,
rotate_offset_sec,
streamer::ROTATE_INTERVAL_SEC);
let name = format!("stream-{}", streamer.short_name());
streamers.push(thread::Builder::new().name(name).spawn(move|| {
streamer.run();
}).expect("can't create thread"));
}
Some((syncer_channel, syncer_join))
} else { None };
// Start the web interface.
let server = Server::http(args.flag_http_addr.as_str()).unwrap();
let h = web::Handler::new(db.clone(), dir.clone());
let _guard = server.handle(h);
info!("Ready to serve HTTP requests");
// Wait for a signal and shut down.
chan_select! {
signal.recv() -> signal => info!("Received signal {:?}; shutting down streamers.", signal),
}
shutdown.store(true, Ordering::SeqCst);
for streamer in streamers.drain(..) {
streamer.join().unwrap();
}
if let Some((syncer_channel, syncer_join)) = syncer {
info!("Shutting down syncer.");
drop(syncer_channel);
syncer_join.join().unwrap();
}
info!("Exiting.");
// TODO: drain the logger.
std::process::exit(0);
}