diff --git a/README.md b/README.md index 5156a33..af6411b 100644 --- a/README.md +++ b/README.md @@ -268,7 +268,7 @@ been done for you. If not, Create After=network-online.target [Service] - ExecStart=/usr/local/bin/moonfire-nvr \ + ExecStart=/usr/local/bin/moonfire-nvr run \ --sample-file-dir=/var/lib/moonfire-nvr/sample \ --db-dir=/var/lib/moonfire-nvr/db \ --http-addr=0.0.0.0:8080 diff --git a/prep.sh b/prep.sh index a97f13e..18603d8 100755 --- a/prep.sh +++ b/prep.sh @@ -231,7 +231,7 @@ Description=${SERVICE_DESC} After=network-online.target [Service] -ExecStart=${SERVICE_BIN} \\ +ExecStart=${SERVICE_BIN} run \\ --sample-file-dir=${SAMPLES_PATH} \\ --db-dir=${DB_DIR} \\ --http-addr=0.0.0.0:${NVR_PORT} diff --git a/src/check.rs b/src/cmds/check.rs similarity index 88% rename from src/check.rs rename to src/cmds/check.rs index 34ec657..47f3e55 100644 --- a/src/check.rs +++ b/src/cmds/check.rs @@ -33,10 +33,33 @@ use db; use error::Error; use recording; -use rusqlite; use std::fs; use uuid::Uuid; +static USAGE: &'static str = r#" +Checks database integrity. + +Usage: + + moonfire-nvr check [options] + moonfire-nvr check --help + +Options: + + --db-dir=DIR Set the directory holding the SQLite3 index database. + This is typically on a flash device. + [default: /var/lib/moonfire-nvr/db] + --sample-file-dir=DIR Set the directory holding video data. + This is typically on a hard drive. + [default: /var/lib/moonfire-nvr/sample] +"#; + +#[derive(Debug, RustcDecodable)] +struct Args { + flag_db_dir: String, + flag_sample_file_dir: String, +} + #[derive(Debug, Eq, PartialEq)] struct RecordingSummary { bytes: u64, @@ -73,9 +96,12 @@ struct File { composite_id: Option, } -pub fn run(conn: rusqlite::Connection, sample_file_dir: &str) -> Result<(), Error> { +pub fn run() -> Result<(), Error> { + let args: Args = super::parse_args(USAGE)?; + super::install_logger(false); + let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, true)?; let mut files = Vec::new(); - for e in fs::read_dir(sample_file_dir)? { + for e in fs::read_dir(&args.flag_sample_file_dir)? { let e = e?; let uuid = match e.file_name().to_str().and_then(|f| Uuid::parse_str(f).ok()) { Some(f) => f, diff --git a/src/cmds/mod.rs b/src/cmds/mod.rs new file mode 100644 index 0000000..73ece47 --- /dev/null +++ b/src/cmds/mod.rs @@ -0,0 +1,102 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2016 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use dir; +use docopt; +use error::Error; +use libc; +use rusqlite; +use slog::{self, DrainExt}; +use slog_envlogger; +use slog_stdlog; +use slog_term; +use std::path::Path; + +mod check; +mod run; +mod ts; +mod upgrade; + +#[derive(Debug, RustcDecodable)] +pub enum Command { + Run, + Upgrade, + Check, + Ts, +} + +impl Command { + pub fn run(&self) -> Result<(), Error> { + match *self { + Command::Run => run::run(), + Command::Upgrade => upgrade::run(), + Command::Check => check::run(), + Command::Ts => ts::run(), + } + } +} + +/// Initializes logging. +/// `async` should be true only for serving; otherwise logging can block useful work. +/// Sync logging should be preferred for other modes because async apparently is never flushed +/// before the program exits, and partial output from these tools is very confusing. +fn install_logger(async: bool) { + let drain = slog_term::StreamerBuilder::new(); + let drain = slog_envlogger::new(if async { drain.async() } else { drain }.full().build()); + slog_stdlog::set_logger(slog::Logger::root(drain.ignore_err(), None)).unwrap(); +} + +/// Locks and opens the database. +/// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is. +fn open_conn(db_dir: &str, read_only: bool) -> Result<(dir::Fd, rusqlite::Connection), Error> { + let dir = dir::Fd::open(db_dir)?; + dir.lock(if read_only { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB) + .map_err(|e| Error{description: format!("db dir {:?} already in use; can't get {} lock", + db_dir, + if read_only { "shared" } else { "exclusive" }), + cause: Some(Box::new(e))})?; + let conn = rusqlite::Connection::open_with_flags( + Path::new(&db_dir).join("db"), + if read_only { + rusqlite::SQLITE_OPEN_READ_ONLY + } else { + rusqlite::SQLITE_OPEN_READ_WRITE + } | + // rusqlite::Connection is not Sync, so there's no reason to tell SQLite3 to use the + // serialized threading mode. + rusqlite::SQLITE_OPEN_NO_MUTEX)?; + Ok((dir, conn)) +} + +fn parse_args(usage: &str) -> Result where T: ::rustc_serialize::Decodable { + Ok(docopt::Docopt::new(usage) + .and_then(|d| d.decode()) + .unwrap_or_else(|e| e.exit())) +} diff --git a/src/cmds/run.rs b/src/cmds/run.rs new file mode 100644 index 0000000..0a8598f --- /dev/null +++ b/src/cmds/run.rs @@ -0,0 +1,129 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2016 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use chan_signal; +use clock; +use db; +use dir; +use error::Error; +use hyper::server::Server; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use stream; +use streamer; +use web; + +const USAGE: &'static str = r#" +Usage: moonfire-nvr run [options] + +Options: + -h, --help Show this message. + --db-dir=DIR Set the directory holding the SQLite3 index database. + This is typically on a flash device. + [default: /var/lib/moonfire-nvr/db] + --sample-file-dir=DIR Set the directory holding video data. + This is typically on a hard drive. + [default: /var/lib/moonfire-nvr/sample] + --http-addr=ADDR Set the bind address for the unencrypted HTTP server. + [default: 0.0.0.0:8080] + --read-only Forces read-only mode / disables recording. +"#; + +#[derive(Debug, RustcDecodable)] +struct Args { + flag_db_dir: String, + flag_sample_file_dir: String, + flag_http_addr: String, + flag_read_only: bool, +} + +pub fn run() -> Result<(), Error> { + let args: Args = super::parse_args(USAGE)?; + + // Watch for termination signals. + // This must be started before any threads are spawned (such as the async logger thread) so + // that signals will be blocked in all threads. + let signal = chan_signal::notify(&[chan_signal::Signal::INT, chan_signal::Signal::TERM]); + super::install_logger(true); + let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, args.flag_read_only)?; + let db = Arc::new(db::Database::new(conn).unwrap()); + let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap(); + info!("Database is loaded."); + + // Start a streamer for each camera. + let shutdown = Arc::new(AtomicBool::new(false)); + let mut streamers = Vec::new(); + let syncer = if !args.flag_read_only { + let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap(); + let l = db.lock(); + let cameras = l.cameras_by_id().len(); + let env = streamer::Environment{ + db: &db, + dir: &dir, + clocks: &clock::REAL, + opener: &*stream::FFMPEG, + shutdown: &shutdown, + }; + for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() { + let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64; + let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera, + rotate_offset_sec, + streamer::ROTATE_INTERVAL_SEC); + let name = format!("stream-{}", streamer.short_name()); + streamers.push(thread::Builder::new().name(name).spawn(move|| { + streamer.run(); + }).expect("can't create thread")); + } + Some((syncer_channel, syncer_join)) + } else { None }; + + // Start the web interface. + let server = Server::http(args.flag_http_addr.as_str()).unwrap(); + let h = web::Handler::new(db.clone(), dir.clone()); + let _guard = server.handle(h); + info!("Ready to serve HTTP requests"); + + // Wait for a signal and shut down. + chan_select! { + signal.recv() -> signal => info!("Received signal {:?}; shutting down streamers.", signal), + } + shutdown.store(true, Ordering::SeqCst); + for streamer in streamers.drain(..) { + streamer.join().unwrap(); + } + if let Some((syncer_channel, syncer_join)) = syncer { + info!("Shutting down syncer."); + drop(syncer_channel); + syncer_join.join().unwrap(); + } + info!("Exiting."); + ::std::process::exit(0); +} diff --git a/src/cmds/ts.rs b/src/cmds/ts.rs new file mode 100644 index 0000000..383b5ed --- /dev/null +++ b/src/cmds/ts.rs @@ -0,0 +1,52 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2016 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use error::Error; +use recording; + +const USAGE: &'static str = r#" +Usage: moonfire-nvr ts ... + moonfire-nvr ts --help +"#; + +#[derive(Debug, RustcDecodable)] +struct Args { + arg_ts: Vec, +} + +pub fn run() -> Result<(), Error> { + let arg: Args = super::parse_args(&USAGE)?; + super::install_logger(false); + for timestamp in &arg.arg_ts { + let t = recording::Time::parse(timestamp)?; + println!("{} == {}", t, t.0); + } + Ok(()) +} diff --git a/src/upgrade/mod.rs b/src/cmds/upgrade/mod.rs similarity index 70% rename from src/upgrade/mod.rs rename to src/cmds/upgrade/mod.rs index 6345ad4..501172a 100644 --- a/src/upgrade/mod.rs +++ b/src/cmds/upgrade/mod.rs @@ -38,6 +38,29 @@ use rusqlite; mod v0_to_v1; +const USAGE: &'static str = r#" +Upgrade to the latest database schema. + +Usage: moonfire-nvr upgrade [options] + +Options: + -h, --help Show this message. + --db-dir=DIR Set the directory holding the SQLite3 index database. + This is typically on a flash device. + [default: /var/lib/moonfire-nvr/db] + --sample-file-dir=DIR Set the directory holding video data. + This is typically on a hard drive. + [default: /var/lib/moonfire-nvr/sample] + --preset-journal=MODE Resets the SQLite journal_mode to the specified mode + prior to the upgrade. The default, delete, is + recommended. off is very dangerous but may be + desirable in some circumstances. See guide/schema.md + for more information. The journal mode will be reset + to wal after the upgrade. + [default: delete] + --no-vacuum Skips the normal post-upgrade vacuum operation. +"#; + const UPGRADE_NOTES: &'static str = concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION")); @@ -45,6 +68,14 @@ const UPGRADERS: [fn(&rusqlite::Transaction) -> Result<(), Error>; 1] = [ v0_to_v1::run, ]; +#[derive(Debug, RustcDecodable)] +struct Args { + flag_db_dir: String, + flag_sample_file_dir: String, + flag_preset_journal: String, + flag_no_vacuum: bool, +} + fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(), Error> { assert!(!requested.contains(';')); // quick check for accidental sql injection. let actual = conn.query_row(&format!("pragma journal_mode = {}", requested), &[], @@ -53,8 +84,11 @@ fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(), Ok(()) } -pub fn run(mut conn: rusqlite::Connection, preset_journal: &str, - no_vacuum: bool) -> Result<(), Error> { +pub fn run() -> Result<(), Error> { + let args: Args = super::parse_args(USAGE)?; + super::install_logger(false); + let (_db_dir, mut conn) = super::open_conn(&args.flag_db_dir, false)?; + { assert_eq!(UPGRADERS.len(), db::EXPECTED_VERSION as usize); let old_ver = @@ -66,7 +100,7 @@ pub fn run(mut conn: rusqlite::Connection, preset_journal: &str, return Err(Error::new(format!("Database is at negative version {}!", old_ver))); } info!("Upgrading database from version {} to version {}...", old_ver, db::EXPECTED_VERSION); - set_journal_mode(&conn, preset_journal).unwrap(); + set_journal_mode(&conn, &args.flag_preset_journal).unwrap(); for ver in old_ver .. db::EXPECTED_VERSION { info!("...from version {} to version {}", ver, ver + 1); let tx = conn.transaction()?; @@ -82,7 +116,7 @@ pub fn run(mut conn: rusqlite::Connection, preset_journal: &str, // WAL is the preferred journal mode for normal operation; it reduces the number of syncs // without compromising safety. set_journal_mode(&conn, "wal").unwrap(); - if !no_vacuum { + if !args.flag_no_vacuum { info!("...vacuuming database after upgrade."); conn.execute_batch(r#" pragma page_size = 16384; diff --git a/src/upgrade/v0_to_v1.rs b/src/cmds/upgrade/v0_to_v1.rs similarity index 100% rename from src/upgrade/v0_to_v1.rs rename to src/cmds/upgrade/v0_to_v1.rs diff --git a/src/main.rs b/src/main.rs index af5f12a..e417d8a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,16 +61,9 @@ extern crate time; extern crate url; extern crate uuid; -use hyper::server::Server; -use slog::DrainExt; -use std::path::Path; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread; - -mod check; mod clock; mod coding; +mod cmds; mod db; mod dir; mod error; @@ -83,164 +76,54 @@ mod stream; mod streamer; mod strutil; #[cfg(test)] mod testutil; -mod upgrade; mod web; /// Commandline usage string. This is in the particular format expected by the `docopt` crate. /// Besides being printed on --help or argument parsing error, it's actually parsed to define the /// allowed commandline arguments and their defaults. const USAGE: &'static str = " -Usage: moonfire-nvr [options] - moonfire-nvr --upgrade [options] - moonfire-nvr --check [options] - moonfire-nvr --ts ... +Usage: moonfire-nvr [...] moonfire-nvr (--help | --version) Options: -h, --help Show this message. --version Show the version of moonfire-nvr. - --db-dir=DIR Set the directory holding the SQLite3 index database. - This is typically on a flash device. - [default: /var/lib/moonfire-nvr/db] - --sample-file-dir=DIR Set the directory holding video data. - This is typically on a hard drive. - [default: /var/lib/moonfire-nvr/sample] - --http-addr=ADDR Set the bind address for the unencrypted HTTP server. - [default: 0.0.0.0:8080] - --read-only Forces read-only mode / disables recording. - --preset-journal=MODE With --upgrade, resets the SQLite journal_mode to - the specified mode prior to the upgrade. The default, - delete, is recommended. off is very dangerous but - may be desirable in some circumstances. See - guide/schema.md for more information. The journal - mode will be reset to wal after the upgrade. - [default: delete] - --no-vacuum With --upgrade, skips the normal post-upgrade vacuum - operation. + +Commands: + run Run the daemon: record from cameras and handle HTTP requests + upgrade Upgrade the database to the latest schema + check Check database integrity + ts Translate between human-readable and numeric timestamps "; /// Commandline arguments corresponding to `USAGE`; automatically filled by the `docopt` crate. -#[derive(RustcDecodable)] +#[derive(Debug, RustcDecodable)] struct Args { - flag_db_dir: String, - flag_sample_file_dir: String, - flag_http_addr: String, - flag_read_only: bool, - flag_check: bool, - flag_upgrade: bool, - flag_ts: bool, - flag_no_vacuum: bool, - flag_preset_journal: String, - arg_ts: Vec, + arg_command: Option, +} + +fn version() -> String { + let major = option_env!("CARGO_PKG_VERSION_MAJOR"); + let minor = option_env!("CARGO_PKG_VERSION_MAJOR"); + let patch = option_env!("CARGO_PKG_VERSION_MAJOR"); + match (major, minor, patch) { + (Some(major), Some(minor), Some(patch)) => format!("{}.{}.{}", major, minor, patch), + _ => "".to_owned(), + } } fn main() { // Parse commandline arguments. - let version = "Moonfire NVR 0.1.0".to_owned(); + // (Note this differs from cmds::parse_args in that it specifies options_first.) let args: Args = docopt::Docopt::new(USAGE) - .and_then(|d| d.version(Some(version)).decode()) + .and_then(|d| d.options_first(true) + .version(Some(version())) + .decode()) .unwrap_or_else(|e| e.exit()); - // Watch for termination signals. - // This must be started before any threads are spawned (such as the async logger thread) so - // that signals will be blocked in all threads. - let signal = chan_signal::notify(&[chan_signal::Signal::INT, chan_signal::Signal::TERM]); - - // Initialize logging. - // Use async logging for serving because otherwise it blocks useful work. - // Use sync logging for other modes because async apparently is never flushed before the - // program exits, and partial output from these tools is very confusing. - let drain = slog_term::StreamerBuilder::new(); - let drain = slog_envlogger::new(if args.flag_upgrade || args.flag_check { drain } - else { drain.async() }.full().build()); - slog_stdlog::set_logger(slog::Logger::root(drain.ignore_err(), None)).unwrap(); - - // Open the database and populate cached state. - let db_dir = dir::Fd::open(&args.flag_db_dir).unwrap(); - db_dir.lock(if args.flag_read_only { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB) - .unwrap(); - let conn = rusqlite::Connection::open_with_flags( - Path::new(&args.flag_db_dir).join("db"), - if args.flag_read_only { - rusqlite::SQLITE_OPEN_READ_ONLY - } else { - rusqlite::SQLITE_OPEN_READ_WRITE - } | - // rusqlite::Connection is not Sync, so there's no reason to tell SQLite3 to use the - // serialized threading mode. - rusqlite::SQLITE_OPEN_NO_MUTEX).unwrap(); - - if args.flag_upgrade { - upgrade::run(conn, &args.flag_preset_journal, args.flag_no_vacuum).unwrap(); - } else if args.flag_check { - check::run(conn, &args.flag_sample_file_dir).unwrap(); - } else if args.flag_ts { - run_ts(args.arg_ts).unwrap(); - } else { - run(args, conn, &signal); + if let Err(e) = args.arg_command.unwrap().run() { + use std::io::Write; + writeln!(&mut ::std::io::stderr(), "{}", e).unwrap(); + ::std::process::exit(1); } } - -fn run_ts(timestamps: Vec) -> Result<(), error::Error> { - for timestamp in ×tamps { - let t = recording::Time::parse(timestamp)?; - println!("{} == {}", t, t.0); - } - Ok(()) -} - -fn run(args: Args, conn: rusqlite::Connection, signal: &chan::Receiver) { - let db = Arc::new(db::Database::new(conn).unwrap()); - let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap(); - info!("Database is loaded."); - - // Start a streamer for each camera. - let shutdown = Arc::new(AtomicBool::new(false)); - let mut streamers = Vec::new(); - let syncer = if !args.flag_read_only { - let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap(); - let l = db.lock(); - let cameras = l.cameras_by_id().len(); - let env = streamer::Environment{ - db: &db, - dir: &dir, - clocks: &clock::REAL, - opener: &*stream::FFMPEG, - shutdown: &shutdown, - }; - for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() { - let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64; - let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera, - rotate_offset_sec, - streamer::ROTATE_INTERVAL_SEC); - let name = format!("stream-{}", streamer.short_name()); - streamers.push(thread::Builder::new().name(name).spawn(move|| { - streamer.run(); - }).expect("can't create thread")); - } - Some((syncer_channel, syncer_join)) - } else { None }; - - // Start the web interface. - let server = Server::http(args.flag_http_addr.as_str()).unwrap(); - let h = web::Handler::new(db.clone(), dir.clone()); - let _guard = server.handle(h); - info!("Ready to serve HTTP requests"); - - // Wait for a signal and shut down. - chan_select! { - signal.recv() -> signal => info!("Received signal {:?}; shutting down streamers.", signal), - } - shutdown.store(true, Ordering::SeqCst); - for streamer in streamers.drain(..) { - streamer.join().unwrap(); - } - if let Some((syncer_channel, syncer_join)) = syncer { - info!("Shutting down syncer."); - drop(syncer_channel); - syncer_join.join().unwrap(); - } - info!("Exiting."); - // TODO: drain the logger. - std::process::exit(0); -}