diff --git a/base/clock.rs b/base/clock.rs index 287efb9..73a5f89 100644 --- a/base/clock.rs +++ b/base/clock.rs @@ -39,7 +39,7 @@ use std::thread; use time::{Duration, Timespec}; /// Abstract interface to the system clocks. This is for testability. -pub trait Clocks : Clone + Sync + 'static { +pub trait Clocks : Send + Sync + 'static { /// Gets the current time from `CLOCK_REALTIME`. fn realtime(&self) -> Timespec; @@ -48,17 +48,17 @@ pub trait Clocks : Clone + Sync + 'static { /// Causes the current thread to sleep for the specified time. fn sleep(&self, how_long: Duration); +} - fn retry_forever>(&self, f: &mut FnMut() -> Result) -> T { - loop { - let e = match f() { - Ok(t) => return t, - Err(e) => e.into(), - }; - let sleep_time = Duration::seconds(1); - warn!("sleeping for {:?} after error: {:?}", sleep_time, e); - self.sleep(sleep_time); - } +pub fn retry_forever>(clocks: &Clocks, f: &mut FnMut() -> Result) -> T { + loop { + let e = match f() { + Ok(t) => return t, + Err(e) => e.into(), + }; + let sleep_time = Duration::seconds(1); + warn!("sleeping for {:?} after error: {:?}", sleep_time, e); + clocks.sleep(sleep_time); } } @@ -89,13 +89,13 @@ impl Clocks for RealClocks { /// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied /// function. -pub struct TimerGuard<'a, C: Clocks, S: AsRef, F: FnOnce() -> S + 'a> { +pub struct TimerGuard<'a, C: Clocks + ?Sized, S: AsRef, F: FnOnce() -> S + 'a> { clocks: &'a C, label_f: Option, start: Timespec, } -impl<'a, C: Clocks, S: AsRef, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> { +impl<'a, C: Clocks + ?Sized, S: AsRef, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> { pub fn new(clocks: &'a C, label_f: F) -> Self { TimerGuard { clocks, @@ -105,7 +105,8 @@ impl<'a, C: Clocks, S: AsRef, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F } } -impl<'a, C: Clocks, S: AsRef, F: FnOnce() -> S + 'a> Drop for TimerGuard<'a, C, S, F> { +impl<'a, C, S, F> Drop for TimerGuard<'a, C, S, F> +where C: Clocks + ?Sized, S: AsRef, F: FnOnce() -> S + 'a { fn drop(&mut self) { let elapsed = self.clocks.monotonic() - self.start; if elapsed.num_seconds() >= 1 { diff --git a/db/db.rs b/db/db.rs index 687c189..3ae1663 100644 --- a/db/db.rs +++ b/db/db.rs @@ -52,6 +52,7 @@ //! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write //! cycles. +use base::clock::Clocks; use dir; use failure::Error; use fnv::{self, FnvHashMap, FnvHashSet}; @@ -564,11 +565,17 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam } pub struct LockedDatabase { + clocks: Arc, conn: rusqlite::Connection, uuid: Uuid, /// If the database is open in read-write mode, the information about the current Open row. open: Option, + + /// The monotonic time when the database was opened (whether in read-write mode or read-only + /// mode). + open_monotonic: recording::Time, + sample_file_dirs_by_id: BTreeMap, cameras_by_id: BTreeMap, streams_by_id: BTreeMap, @@ -862,6 +869,18 @@ impl LockedDatabase { for (&stream_id, mut r) in &mut new_ranges { *r = raw::get_range(&tx, stream_id)?; } + { + let mut stmt = tx.prepare_cached( + r"update open set duration_90k = ?, end_time_90k = ? where id = ?")?; + let rows = stmt.execute(&[ + &(recording::Time::new(self.clocks.monotonic()) - self.open_monotonic).0, + &recording::Time::new(self.clocks.realtime()).0, + &o.id, + ])?; + if rows != 1 { + bail!("unable to find current open {}", o.id); + } + } tx.commit()?; // Process delete_garbage. @@ -1710,7 +1729,8 @@ impl Drop for Database { impl Database { /// Creates the database from a caller-supplied SQLite connection. - pub fn new(conn: rusqlite::Connection, read_write: bool) -> Result { + pub fn new(clocks: Arc, conn: rusqlite::Connection, + read_write: bool) -> Result { conn.execute("pragma foreign_keys = on", &[])?; { let ver = get_schema_version(&conn)?.ok_or_else(|| format_err!( @@ -1735,20 +1755,24 @@ impl Database { // Note: the meta check comes after the version check to improve the error message when // trying to open a version 0 or version 1 database (which lacked the meta table). let uuid = raw::get_db_uuid(&conn)?; + let open_monotonic = recording::Time::new(clocks.monotonic()); let open = if read_write { - let mut stmt = conn.prepare(" insert into open (uuid) values (?)")?; + let real = recording::Time::new(clocks.realtime()); + let mut stmt = conn.prepare(" insert into open (uuid, start_time_90k) values (?, ?)")?; let uuid = Uuid::new_v4(); let uuid_bytes = &uuid.as_bytes()[..]; - stmt.execute(&[&uuid_bytes])?; + stmt.execute(&[&uuid_bytes, &real.0])?; Some(Open { id: conn.last_insert_rowid() as u32, uuid, }) } else { None }; let db = Database(Some(Mutex::new(LockedDatabase { - conn: conn, + clocks, + conn, uuid, open, + open_monotonic, sample_file_dirs_by_id: BTreeMap::new(), cameras_by_id: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(), @@ -1789,7 +1813,9 @@ impl Database { /// Locks the database; the returned reference is the only way to perform (read or write) /// operations. - pub fn lock(&self) -> MutexGuard { self.0.as_ref().unwrap().lock() } + pub fn lock(&self) -> MutexGuard { + self.0.as_ref().unwrap().lock() + } /// For testing: closes the database (without flushing) and returns the connection. /// This allows verification that a newly opened database is in an acceptable state. @@ -1803,6 +1829,7 @@ impl Database { mod tests { extern crate tempdir; + use base::clock; use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite::Connection; use std::collections::BTreeMap; @@ -1968,7 +1995,8 @@ mod tests { #[test] fn test_no_meta_or_version() { testutil::init(); - let e = Database::new(Connection::open_in_memory().unwrap(), false).err().unwrap(); + let e = Database::new(Arc::new(clock::RealClocks{}), Connection::open_in_memory().unwrap(), + false).err().unwrap(); assert!(e.to_string().starts_with("no such table"), "{}", e); } @@ -1977,7 +2005,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); - let e = Database::new(c, false).err().unwrap(); + let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 2 is too old (expected 3)"), "got: {:?}", e); } @@ -1987,7 +2015,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap(); - let e = Database::new(c, false).err().unwrap(); + let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 4 is too new (expected 3)"), "got: {:?}", e); } @@ -1997,7 +2025,7 @@ mod tests { fn test_fresh_db() { testutil::init(); let conn = setup_conn(); - let db = Database::new(conn, true).unwrap(); + let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); let db = db.lock(); assert_eq!(0, db.cameras_by_id().values().count()); } @@ -2007,7 +2035,7 @@ mod tests { fn test_full_lifecycle() { testutil::init(); let conn = setup_conn(); - let db = Database::new(conn, true).unwrap(); + let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); let path = tmpdir.path().to_str().unwrap().to_owned(); let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap(); @@ -2063,7 +2091,7 @@ mod tests { // Closing and reopening the database should present the same contents. let conn = db.close(); - let db = Database::new(conn, true).unwrap(); + let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2); assert_no_recordings(&db, camera_uuid); @@ -2104,7 +2132,7 @@ mod tests { // Queries on a fresh database should return the correct result (with caches populated from // existing database contents rather than built on insert). let conn = db.close(); - let db = Database::new(conn, true).unwrap(); + let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); assert_single_recording(&db, main_stream_id, &recording); // Deleting a recording should succeed, update the min/max times, and mark it as garbage. diff --git a/db/schema.sql b/db/schema.sql index a2c0dbf..82e5e85 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -52,11 +52,25 @@ create table version ( -- Tracks every time the database has been opened in read/write mode. -- This is used to ensure directories are in sync with the database (see --- schema.proto:DirMeta). It may be used in the API for etags and such in the --- future. +-- schema.proto:DirMeta), to disambiguate uncommitted recordings, and +-- potentially to understand time problems. create table open ( id integer primary key, - uuid blob unique not null check (length(uuid) = 16) + uuid blob unique not null check (length(uuid) = 16), + + -- Information about when / how long the database was open. These may be all + -- null, for example in the open that represents all information written + -- prior to database version 3. + + -- System time when the database was opened. + start_time_90k integer, + + -- System time when the database was closed or (on crash) last flushed. + end_time_90k integer, + + -- How long the database was open. This is end_time_90k - start_time_90k if + -- there were no time steps during this time. + duration_90k integer ); create table sample_file_dir ( diff --git a/db/testutil.rs b/db/testutil.rs index cde4d6f..3ce83e6 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -28,6 +28,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use base::clock; use db; use dir; use fnv::FnvHashMap; @@ -77,9 +78,10 @@ impl TestDb { pub fn new() -> TestDb { let tmpdir = TempDir::new("moonfire-nvr-test").unwrap(); + let clocks = Arc::new(clock::RealClocks{}); let mut conn = rusqlite::Connection::open_in_memory().unwrap(); db::Database::init(&mut conn).unwrap(); - let db = Arc::new(db::Database::new(conn, true).unwrap()); + let db = Arc::new(db::Database::new(clocks, conn, true).unwrap()); let (test_camera_uuid, sample_file_dir_id); let path = tmpdir.path().to_str().unwrap().to_owned(); let dir; diff --git a/db/upgrade/v1_to_v2.rs b/db/upgrade/v1_to_v2.rs index bf6ae46..1c3a04b 100644 --- a/db/upgrade/v1_to_v2.rs +++ b/db/upgrade/v1_to_v2.rs @@ -56,7 +56,10 @@ pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> ); create table open ( id integer primary key, - uuid blob unique not null check (length(uuid) = 16) + uuid blob unique not null check (length(uuid) = 16), + start_time_90k integer, + end_time_90k integer, + duration_90k integer ); create table sample_file_dir ( id integer primary key, diff --git a/db/writer.rs b/db/writer.rs index af3e63b..921ce81 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -355,7 +355,7 @@ impl Syncer { } let c = &self.clocks; for &id in &garbage { - c.retry_forever(&mut || { + clock::retry_forever(c, &mut || { if let Err(e) = self.dir.unlink_file(id) { if e.kind() == io::ErrorKind::NotFound { warn!("dir: recording {} already deleted!", id); @@ -366,8 +366,8 @@ impl Syncer { Ok(()) }); } - c.retry_forever(&mut || self.dir.sync()); - c.retry_forever(&mut || self.db.lock().delete_garbage(self.dir_id, &mut garbage)); + clock::retry_forever(c, &mut || self.dir.sync()); + clock::retry_forever(c, &mut || self.db.lock().delete_garbage(self.dir_id, &mut garbage)); } /// Saves the given recording and causes rotation to happen. Called from worker thread. @@ -380,8 +380,8 @@ impl Syncer { let stream_id = id.stream(); // Free up a like number of bytes. - self.clocks.retry_forever(&mut || f.sync_all()); - self.clocks.retry_forever(&mut || self.dir.sync()); + clock::retry_forever(&self.clocks, &mut || f.sync_all()); + clock::retry_forever(&self.clocks, &mut || self.dir.sync()); let mut db = self.db.lock(); db.mark_synced(id).unwrap(); delete_recordings(&mut db, stream_id, 0).unwrap(); @@ -553,7 +553,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> { flags: db::RecordingFlags::Growing as i32, ..Default::default() })?; - let f = self.clocks.retry_forever(&mut || self.dir.create_file(id)); + let f = clock::retry_forever(self.clocks, &mut || self.dir.create_file(id)); self.state = WriterState::Open(InnerWriter { f, @@ -601,7 +601,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> { } let mut remaining = pkt; while !remaining.is_empty() { - let written = self.clocks.retry_forever(&mut || w.f.write(remaining)); + let written = clock::retry_forever(self.clocks, &mut || w.f.write(remaining)); remaining = &remaining[written..]; } w.unflushed_sample = Some(UnflushedSample { diff --git a/src/cmds/config/mod.rs b/src/cmds/config/mod.rs index ef4c82e..bdae46f 100644 --- a/src/cmds/config/mod.rs +++ b/src/cmds/config/mod.rs @@ -37,6 +37,7 @@ extern crate cursive; use self::cursive::Cursive; use self::cursive::views; +use clock; use db; use failure::Error; use regex::Regex; @@ -124,7 +125,8 @@ struct Args { pub fn run() -> Result<(), Error> { let args: Args = super::parse_args(USAGE)?; let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?; - let db = Arc::new(db::Database::new(conn, true)?); + let clocks = Arc::new(clock::RealClocks{}); + let db = Arc::new(db::Database::new(clocks, conn, true)?); let mut siv = Cursive::new(); //siv.add_global_callback('q', |s| s.quit()); diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 033e742..651ba11 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -96,10 +96,11 @@ struct Syncer { pub fn run() -> Result<(), Error> { let args: Args = super::parse_args(USAGE)?; + let clocks = Arc::new(clock::RealClocks{}); let (_db_dir, conn) = super::open_conn( &args.flag_db_dir, if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?; - let db = Arc::new(db::Database::new(conn, !args.flag_read_only).unwrap()); + let db = Arc::new(db::Database::new(clocks.clone(), conn, !args.flag_read_only).unwrap()); info!("Database is loaded."); { @@ -122,7 +123,7 @@ pub fn run() -> Result<(), Error> { let streams = l.streams_by_id().len(); let env = streamer::Environment { db: &db, - clocks: &clock::RealClocks{}, + clocks: clocks.clone(), opener: &*stream::FFMPEG, shutdown: &shutdown_streamers, }; diff --git a/src/streamer.rs b/src/streamer.rs index 012b85e..a532e8b 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -42,7 +42,7 @@ pub static ROTATE_INTERVAL_SEC: i64 = 60; /// Common state that can be used by multiple `Streamer` instances. pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { - pub clocks: &'a C, + pub clocks: Arc, pub opener: &'a stream::Opener, pub db: &'b Arc, pub shutdown: &'b Arc, @@ -57,7 +57,7 @@ pub struct Streamer<'a, C, S> where C: Clocks, S: 'a + stream::Stream { db: Arc, dir: Arc, syncer_channel: writer::SyncerChannel<::std::fs::File>, - clocks: &'a C, + clocks: Arc, opener: &'a stream::Opener, stream_id: i32, short_name: String, @@ -77,7 +77,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { db: env.db.clone(), dir, syncer_channel: syncer_channel, - clocks: env.clocks, + clocks: env.clocks.clone(), opener: env.opener, stream_id: stream_id, short_name: format!("{}-{}", c.short_name, s.type_.as_str()), @@ -103,14 +103,14 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { info!("{}: Opening input: {}", self.short_name, self.redacted_url); let mut stream = { - let _t = TimerGuard::new(self.clocks, || format!("opening {}", self.redacted_url)); + let _t = TimerGuard::new(&*self.clocks, || format!("opening {}", self.redacted_url)); self.opener.open(stream::Source::Rtsp(&self.url))? }; let realtime_offset = self.clocks.realtime() - self.clocks.monotonic(); // TODO: verify width/height. let extra_data = stream.get_extra_data()?; let video_sample_entry_id = { - let _t = TimerGuard::new(self.clocks, || "inserting video sample entry"); + let _t = TimerGuard::new(&*self.clocks, || "inserting video sample entry"); self.db.lock().insert_video_sample_entry(extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec)? @@ -121,11 +121,11 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // Seconds since epoch at which to next rotate. let mut rotate: Option = None; let mut transformed = Vec::new(); - let mut w = writer::Writer::new(self.clocks, &self.dir, &self.db, &self.syncer_channel, + let mut w = writer::Writer::new(&*self.clocks, &self.dir, &self.db, &self.syncer_channel, self.stream_id, video_sample_entry_id); while !self.shutdown.load(Ordering::SeqCst) { let pkt = { - let _t = TimerGuard::new(self.clocks, || "getting next packet"); + let _t = TimerGuard::new(&*self.clocks, || "getting next packet"); stream.get_next()? }; let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?; @@ -140,7 +140,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { rotate = if let Some(r) = rotate { if frame_realtime.sec > r && pkt.is_key() { trace!("{}: write on normal rotation", self.short_name); - let _t = TimerGuard::new(self.clocks, || "closing writer"); + let _t = TimerGuard::new(&*self.clocks, || "closing writer"); w.close(Some(pts)); None } else { @@ -159,7 +159,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // usual. This ensures there's plenty of frame times to use when calculating // the start time. let r = r + if w.previously_opened()? { 0 } else { self.rotate_interval_sec }; - let _t = TimerGuard::new(self.clocks, || "creating writer"); + let _t = TimerGuard::new(&*self.clocks, || "creating writer"); r }, }; @@ -173,13 +173,13 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { } else { orig_data }; - let _t = TimerGuard::new(self.clocks, + let _t = TimerGuard::new(&*self.clocks, || format!("writing {} bytes", transformed_data.len())); w.write(transformed_data, local_time, pts, pkt.is_key())?; rotate = Some(r); } if rotate.is_some() { - let _t = TimerGuard::new(self.clocks, || "closing writer"); + let _t = TimerGuard::new(&*self.clocks, || "closing writer"); w.close(None); } Ok(()) @@ -322,7 +322,7 @@ mod tests { fn basic() { testutil::init(); // 2015-04-25 00:00:00 UTC - let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); + let clocks = Arc::new(clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0))); clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); @@ -337,7 +337,7 @@ mod tests { }; let db = testutil::TestDb::new(); let env = super::Environment{ - clocks: &clocks, + clocks: Arc::clone(&clocks), opener: &opener, db: &db.db, shutdown: &opener.shutdown,