diff --git a/base/clock.rs b/base/clock.rs index 73a5f89..8c63edf 100644 --- a/base/clock.rs +++ b/base/clock.rs @@ -62,7 +62,7 @@ pub fn retry_forever>(clocks: &Clocks, f: &mut FnMut() -> Resu } } -#[derive(Clone)] +#[derive(Copy, Clone)] pub struct RealClocks {} impl RealClocks { diff --git a/db/db.rs b/db/db.rs index 3ae1663..093138c 100644 --- a/db/db.rs +++ b/db/db.rs @@ -52,7 +52,7 @@ //! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write //! cycles. -use base::clock::Clocks; +use base::clock::{self, Clocks}; use dir; use failure::Error; use fnv::{self, FnvHashMap, FnvHashSet}; @@ -565,7 +565,6 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam } pub struct LockedDatabase { - clocks: Arc, conn: rusqlite::Connection, uuid: Uuid, @@ -812,16 +811,10 @@ impl LockedDatabase { Ok(()) } - /// Tries to flush unwritten changes from the stream directories. + /// Helper for `DatabaseGuard::flush()` and `Database::drop()`. /// - /// * commits any recordings added with `add_recording` that have since been marked as - /// synced. - /// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`. - /// * removes entries from the garbage table as requested by `mark_sample_files_deleted`. - /// - /// On success, for each affected sample file directory with a flush watcher set, sends a - /// `Flush` event. - pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> { + /// The public API is in `DatabaseGuard::flush()`; it supplies the `Clocks` to this function. + fn flush(&mut self, clocks: &C, reason: &str) -> Result<(), Error> { let o = match self.open.as_ref() { None => bail!("database is read-only"), Some(o) => o, @@ -873,8 +866,8 @@ impl LockedDatabase { let mut stmt = tx.prepare_cached( r"update open set duration_90k = ?, end_time_90k = ? where id = ?")?; let rows = stmt.execute(&[ - &(recording::Time::new(self.clocks.monotonic()) - self.open_monotonic).0, - &recording::Time::new(self.clocks.realtime()).0, + &(recording::Time::new(clocks.monotonic()) - self.open_monotonic).0, + &recording::Time::new(clocks.realtime()).0, &o.id, ])?; if rows != 1 { @@ -1692,6 +1685,21 @@ impl LockedDatabase { } } +/// Initializes a database. +/// Note this doesn't set journal options, so that it can be used on in-memory databases for +/// test code. +pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> { + let tx = conn.transaction()?; + tx.execute_batch(include_str!("schema.sql"))?; + { + let uuid = ::uuid::Uuid::new_v4(); + let uuid_bytes = &uuid.as_bytes()[..]; + tx.execute("insert into meta (uuid) values (?)", &[&uuid_bytes])?; + } + tx.commit()?; + Ok(()) +} + /// Gets the schema version from the given database connection. /// A fully initialized database will return `Ok(Some(version))` where `version` is an integer that /// can be compared to `EXPECTED_VERSION`. An empty database will return `Ok(None)`. A partially @@ -1709,28 +1717,37 @@ pub fn get_schema_version(conn: &rusqlite::Connection) -> Result, Er /// The recording database. Abstracts away SQLite queries. Also maintains in-memory state /// (loaded on startup, and updated on successful commit) to avoid expensive scans over the /// recording table on common queries. -pub struct Database( +pub struct Database { /// This is wrapped in an `Option` to allow the `Drop` implementation and `close` to coexist. - Option> -); + db: Option>, -impl Drop for Database { + /// This is kept separately from the `LockedDatabase` to allow the `lock()` operation itself to + /// access it. It doesn't need a `Mutex` anyway; it's `Sync`, and all operations work on + /// `&self`. + clocks: C, +} + +impl Drop for Database { fn drop(&mut self) { if ::std::thread::panicking() { return; // don't flush while panicking. } - if let Some(m) = self.0.take() { - if let Err(e) = m.into_inner().flush("drop") { + if let Some(m) = self.db.take() { + if let Err(e) = m.into_inner().flush(&self.clocks, "drop") { error!("Final database flush failed: {}", e); } } } } -impl Database { +// Helpers for Database::lock(). Closures don't implement Fn. +fn acquisition() -> &'static str { "database lock acquisition" } +fn operation() -> &'static str { "database operation" } + +impl Database { /// Creates the database from a caller-supplied SQLite connection. - pub fn new(clocks: Arc, conn: rusqlite::Connection, - read_write: bool) -> Result { + pub fn new(clocks: C, conn: rusqlite::Connection, + read_write: bool) -> Result, Error> { conn.execute("pragma foreign_keys = on", &[])?; { let ver = get_schema_version(&conn)?.ok_or_else(|| format_err!( @@ -1767,20 +1784,22 @@ impl Database { uuid, }) } else { None }; - let db = Database(Some(Mutex::new(LockedDatabase { + let db = Database { + db: Some(Mutex::new(LockedDatabase { + conn, + uuid, + open, + open_monotonic, + sample_file_dirs_by_id: BTreeMap::new(), + cameras_by_id: BTreeMap::new(), + cameras_by_uuid: BTreeMap::new(), + streams_by_id: BTreeMap::new(), + video_sample_entries_by_id: BTreeMap::new(), + video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), + on_flush: Vec::new(), + })), clocks, - conn, - uuid, - open, - open_monotonic, - sample_file_dirs_by_id: BTreeMap::new(), - cameras_by_id: BTreeMap::new(), - cameras_by_uuid: BTreeMap::new(), - streams_by_id: BTreeMap::new(), - video_sample_entries_by_id: BTreeMap::new(), - video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), - on_flush: Vec::new(), - }))); + }; { let l = &mut *db.lock(); l.init_video_sample_entries()?; @@ -1796,35 +1815,62 @@ impl Database { Ok(db) } - /// Initializes a database. - /// Note this doesn't set journal options, so that it can be used on in-memory databases for - /// test code. - pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> { - let tx = conn.transaction()?; - tx.execute_batch(include_str!("schema.sql"))?; - { - let uuid = ::uuid::Uuid::new_v4(); - let uuid_bytes = &uuid.as_bytes()[..]; - tx.execute("insert into meta (uuid) values (?)", &[&uuid_bytes])?; - } - tx.commit()?; - Ok(()) - } + #[inline(always)] + pub fn clocks(&self) -> C { self.clocks.clone() } /// Locks the database; the returned reference is the only way to perform (read or write) /// operations. - pub fn lock(&self) -> MutexGuard { - self.0.as_ref().unwrap().lock() + pub fn lock(&self) -> DatabaseGuard { + let timer = clock::TimerGuard::new(&self.clocks, acquisition); + let db = self.db.as_ref().unwrap().lock(); + drop(timer); + let _timer = clock::TimerGuard:: &'static str>::new( + &self.clocks, operation); + DatabaseGuard { + clocks: &self.clocks, + db, + _timer, + } } /// For testing: closes the database (without flushing) and returns the connection. /// This allows verification that a newly opened database is in an acceptable state. #[cfg(test)] fn close(mut self) -> rusqlite::Connection { - self.0.take().unwrap().into_inner().conn + self.db.take().unwrap().into_inner().conn } } +pub struct DatabaseGuard<'db, C: Clocks> { + clocks: &'db C, + db: MutexGuard<'db, LockedDatabase>, + _timer: clock::TimerGuard<'db, C, &'static str, fn() -> &'static str>, +} + +impl<'db, C: Clocks + Clone> DatabaseGuard<'db, C> { + /// Tries to flush unwritten changes from the stream directories. + /// + /// * commits any recordings added with `add_recording` that have since been marked as + /// synced. + /// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`. + /// * removes entries from the garbage table as requested by `mark_sample_files_deleted`. + /// + /// On success, for each affected sample file directory with a flush watcher set, sends a + /// `Flush` event. + pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> { + self.db.flush(self.clocks, reason) + } +} + +impl<'db, C: Clocks + Clone> ::std::ops::Deref for DatabaseGuard<'db, C> { + type Target = LockedDatabase; + fn deref(&self) -> &LockedDatabase { &*self.db } +} + +impl<'db, C: Clocks + Clone> ::std::ops::DerefMut for DatabaseGuard<'db, C> { + fn deref_mut(&mut self) -> &mut LockedDatabase { &mut *self.db } +} + #[cfg(test)] mod tests { extern crate tempdir; @@ -1840,7 +1886,7 @@ mod tests { fn setup_conn() -> Connection { let mut conn = Connection::open_in_memory().unwrap(); - Database::init(&mut conn).unwrap(); + super::init(&mut conn).unwrap(); conn } @@ -1995,7 +2041,7 @@ mod tests { #[test] fn test_no_meta_or_version() { testutil::init(); - let e = Database::new(Arc::new(clock::RealClocks{}), Connection::open_in_memory().unwrap(), + let e = Database::new(clock::RealClocks {}, Connection::open_in_memory().unwrap(), false).err().unwrap(); assert!(e.to_string().starts_with("no such table"), "{}", e); } @@ -2005,7 +2051,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); - let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap(); + let e = Database::new(clock::RealClocks {}, c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 2 is too old (expected 3)"), "got: {:?}", e); } @@ -2015,7 +2061,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap(); - let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap(); + let e = Database::new(clock::RealClocks {}, c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 4 is too new (expected 3)"), "got: {:?}", e); } @@ -2025,7 +2071,7 @@ mod tests { fn test_fresh_db() { testutil::init(); let conn = setup_conn(); - let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); + let db = Database::new(clock::RealClocks {}, conn, true).unwrap(); let db = db.lock(); assert_eq!(0, db.cameras_by_id().values().count()); } @@ -2035,7 +2081,7 @@ mod tests { fn test_full_lifecycle() { testutil::init(); let conn = setup_conn(); - let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); + let db = Database::new(clock::RealClocks {}, conn, true).unwrap(); let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); let path = tmpdir.path().to_str().unwrap().to_owned(); let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap(); @@ -2091,7 +2137,7 @@ mod tests { // Closing and reopening the database should present the same contents. let conn = db.close(); - let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); + let db = Database::new(clock::RealClocks {}, conn, true).unwrap(); assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2); assert_no_recordings(&db, camera_uuid); @@ -2132,7 +2178,7 @@ mod tests { // Queries on a fresh database should return the correct result (with caches populated from // existing database contents rather than built on insert). let conn = db.close(); - let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap(); + let db = Database::new(clock::RealClocks {}, conn, true).unwrap(); assert_single_recording(&db, main_stream_id, &recording); // Deleting a recording should succeed, update the min/max times, and mark it as garbage. diff --git a/db/recording.rs b/db/recording.rs index f11f1ec..ed16272 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -498,6 +498,7 @@ impl Segment { #[cfg(test)] mod tests { + use base::clock::RealClocks; use super::*; use testutil::{self, TestDb}; @@ -643,7 +644,7 @@ mod tests { let bytes = 3 * i; encoder.add_sample(duration_90k, bytes, true, &mut r); } - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); // Time range [2, 2 + 4 + 6 + 8) means the 2nd, 3rd, 4th samples should be // included. @@ -662,7 +663,7 @@ mod tests { let bytes = 3 * i; encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); // Time range [2 + 4 + 6, 2 + 4 + 6 + 8) means the 4th sample should be included. // The 3rd also gets pulled in because it is a sync frame and the 4th is not. @@ -678,7 +679,7 @@ mod tests { encoder.add_sample(1, 1, true, &mut r); encoder.add_sample(1, 2, true, &mut r); encoder.add_sample(0, 3, true, &mut r); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 1 .. 2).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[2, 3]); @@ -691,7 +692,7 @@ mod tests { let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); encoder.add_sample(1, 1, true, &mut r); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 0).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1]); @@ -709,7 +710,7 @@ mod tests { let bytes = 3 * i; encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 2+4+6+8+10).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[2, 4, 6, 8, 10]); @@ -723,7 +724,7 @@ mod tests { encoder.add_sample(1, 1, true, &mut r); encoder.add_sample(1, 2, true, &mut r); encoder.add_sample(0, 3, true, &mut r); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 2).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1, 2, 3]); diff --git a/db/testutil.rs b/db/testutil.rs index 3ce83e6..d7cfa99 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -28,7 +28,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use base::clock; +use base::clock::Clocks; use db; use dir; use fnv::FnvHashMap; @@ -64,8 +64,8 @@ pub fn init() { }); } -pub struct TestDb { - pub db: Arc, +pub struct TestDb { + pub db: Arc>, pub dirs_by_stream_id: Arc>>, pub syncer_channel: writer::SyncerChannel<::std::fs::File>, pub syncer_join: thread::JoinHandle<()>, @@ -73,14 +73,13 @@ pub struct TestDb { pub test_camera_uuid: Uuid, } -impl TestDb { +impl TestDb { /// Creates a test database with one camera. - pub fn new() -> TestDb { + pub fn new(clocks: C) -> Self { let tmpdir = TempDir::new("moonfire-nvr-test").unwrap(); - let clocks = Arc::new(clock::RealClocks{}); let mut conn = rusqlite::Connection::open_in_memory().unwrap(); - db::Database::init(&mut conn).unwrap(); + db::init(&mut conn).unwrap(); let db = Arc::new(db::Database::new(clocks, conn, true).unwrap()); let (test_camera_uuid, sample_file_dir_id); let path = tmpdir.path().to_str().unwrap().to_owned(); diff --git a/db/writer.rs b/db/writer.rs index 921ce81..55c7488 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -97,11 +97,10 @@ impl ::std::clone::Clone for SyncerChannel { } /// State of the worker thread. -struct Syncer { - clocks: C, +struct Syncer { dir_id: i32, dir: D, - db: Arc, + db: Arc>, } /// Starts a syncer for the given sample file directory. @@ -121,8 +120,9 @@ struct Syncer { /// Note that dropping all `SyncerChannel` clones currently includes calling /// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. /// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. -pub fn start_syncer(db: Arc, dir_id: i32) - -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error> { +pub fn start_syncer(db: Arc>, dir_id: i32) + -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error> +where C: Clocks + Clone { let db2 = db.clone(); let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.initial_rotation()?; @@ -216,8 +216,30 @@ impl SyncerChannel { } } -impl Syncer> { - fn new(l: &db::LockedDatabase, db: Arc, dir_id: i32) +/// Lists files which should be "abandoned" (deleted without ever recording in the database) +/// on opening. +fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap) + -> Result, Error> { + let mut v = Vec::new(); + for e in ::std::fs::read_dir(path)? { + let e = e?; + let id = match dir::parse_id(e.file_name().as_bytes()) { + Ok(i) => i, + Err(_) => continue, + }; + let next = match streams_to_next.get(&id.stream()) { + Some(n) => *n, + None => continue, // unknown stream. + }; + if id.recording() >= next { + v.push(id); + } + } + Ok(v) +} + +impl Syncer> { + fn new(l: &db::LockedDatabase, db: Arc>, dir_id: i32) -> Result<(Self, String), Error> { let d = l.sample_file_dirs_by_id() .get(&dir_id) @@ -237,7 +259,7 @@ impl Syncer> { } }) .collect(); - let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?; + let to_abandon = list_files_to_abandon(&d.path, streams_to_next)?; let mut undeletable = 0; for &id in &to_abandon { if let Err(e) = dir.unlink_file(id) { @@ -254,35 +276,12 @@ impl Syncer> { } Ok((Syncer { - clocks: clock::RealClocks{}, dir_id, dir, db, }, d.path.clone())) } - /// Lists files which should be "abandoned" (deleted without ever recording in the database) - /// on opening. - fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap) - -> Result, Error> { - let mut v = Vec::new(); - for e in ::std::fs::read_dir(path)? { - let e = e?; - let id = match dir::parse_id(e.file_name().as_bytes()) { - Ok(i) => i, - Err(_) => continue, - }; - let next = match streams_to_next.get(&id.stream()) { - Some(n) => *n, - None => continue, // unknown stream. - }; - if id.recording() >= next { - v.push(id); - } - } - Ok(v) - } - /// Rotates files for all streams and deletes stale files from previous runs. /// Called from main thread. fn initial_rotation(&mut self) -> Result<(), Error> { @@ -331,7 +330,7 @@ impl Syncer> { } } -impl Syncer { +impl Syncer { fn run(&mut self, cmds: mpsc::Receiver>) { loop { match cmds.recv() { @@ -353,7 +352,7 @@ impl Syncer { if garbage.is_empty() { return; } - let c = &self.clocks; + let c = &self.db.clocks(); for &id in &garbage { clock::retry_forever(c, &mut || { if let Err(e) = self.dir.unlink_file(id) { @@ -380,8 +379,8 @@ impl Syncer { let stream_id = id.stream(); // Free up a like number of bytes. - clock::retry_forever(&self.clocks, &mut || f.sync_all()); - clock::retry_forever(&self.clocks, &mut || self.dir.sync()); + clock::retry_forever(&self.db.clocks(), &mut || f.sync_all()); + clock::retry_forever(&self.db.clocks(), &mut || self.dir.sync()); let mut db = self.db.lock(); db.mark_synced(id).unwrap(); delete_recordings(&mut db, stream_id, 0).unwrap(); @@ -412,10 +411,9 @@ impl Syncer { /// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It /// saves the recording to the database (if I/O errors do not prevent this), retries forever, /// or panics (if further writing on this stream is impossible). -pub struct Writer<'a, C: Clocks, D: DirWriter> { - clocks: &'a C, +pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> { dir: &'a D, - db: &'a db::Database, + db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, video_sample_entry_id: i32, @@ -523,11 +521,10 @@ struct PreviousWriter { run_offset: i32, } -impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> { - pub fn new(clocks: &'a C, dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel, +impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { + pub fn new(dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, video_sample_entry_id: i32) -> Self { Writer { - clocks, dir, db, channel, @@ -553,7 +550,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> { flags: db::RecordingFlags::Growing as i32, ..Default::default() })?; - let f = clock::retry_forever(self.clocks, &mut || self.dir.create_file(id)); + let f = clock::retry_forever(&self.db.clocks(), &mut || self.dir.create_file(id)); self.state = WriterState::Open(InnerWriter { f, @@ -601,7 +598,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> { } let mut remaining = pkt; while !remaining.is_empty() { - let written = clock::retry_forever(self.clocks, &mut || w.f.write(remaining)); + let written = clock::retry_forever(&self.db.clocks(), &mut || w.f.write(remaining)); remaining = &remaining[written..]; } w.unflushed_sample = Some(UnflushedSample { @@ -669,7 +666,7 @@ impl InnerWriter { } } -impl<'a, C: Clocks, D: DirWriter> Drop for Writer<'a, C, D> { +impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> { fn drop(&mut self) { if ::std::thread::panicking() { // This will probably panic again. Don't do it. @@ -779,8 +776,8 @@ mod tests { } struct Harness { - clocks: SimulatedClocks, - db: Arc, + //clocks: SimulatedClocks, + db: Arc>, dir_id: i32, _tmpdir: ::tempdir::TempDir, dir: MockDir, @@ -789,7 +786,8 @@ mod tests { } fn new_harness() -> Harness { - let tdb = testutil::TestDb::new(); + let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0)); + let tdb = testutil::TestDb::new(clocks); let dir_id = *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(); // This starts a real fs-backed syncer. Get rid of it. @@ -798,10 +796,8 @@ mod tests { tdb.syncer_join.join().unwrap(); // Start a mocker syncer. - let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0)); let dir = MockDir::new(); let mut syncer = super::Syncer { - clocks: clocks.clone(), dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(), dir: dir.clone(), db: tdb.db.clone(), @@ -818,7 +814,7 @@ mod tests { .spawn(move || syncer.run(rcv)).unwrap(); Harness { - clocks, + //clocks, dir_id, dir, db: tdb.db, @@ -837,7 +833,7 @@ mod tests { let video_sample_entry_id = h.db.lock().insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); { - let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id); h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio())))); let f = MockFile::new(); @@ -895,7 +891,7 @@ mod tests { let video_sample_entry_id = h.db.lock().insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); { - let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id); let f = MockFile::new(); h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), diff --git a/src/cmds/config/mod.rs b/src/cmds/config/mod.rs index bdae46f..90169ac 100644 --- a/src/cmds/config/mod.rs +++ b/src/cmds/config/mod.rs @@ -125,7 +125,7 @@ struct Args { pub fn run() -> Result<(), Error> { let args: Args = super::parse_args(USAGE)?; let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?; - let clocks = Arc::new(clock::RealClocks{}); + let clocks = clock::RealClocks {}; let db = Arc::new(db::Database::new(clocks, conn, true)?); let mut siv = Cursive::new(); diff --git a/src/cmds/init.rs b/src/cmds/init.rs index f2b8a58..b0f91f1 100644 --- a/src/cmds/init.rs +++ b/src/cmds/init.rs @@ -66,7 +66,7 @@ pub fn run() -> Result<(), Error> { pragma journal_mode = wal; pragma page_size = 16384; "#)?; - db::Database::init(&mut conn)?; + db::init(&mut conn)?; info!("Database initialized."); Ok(()) } diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 651ba11..3b5f96f 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -96,7 +96,7 @@ struct Syncer { pub fn run() -> Result<(), Error> { let args: Args = super::parse_args(USAGE)?; - let clocks = Arc::new(clock::RealClocks{}); + let clocks = clock::RealClocks {}; let (_db_dir, conn) = super::open_conn( &args.flag_db_dir, if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?; @@ -123,7 +123,6 @@ pub fn run() -> Result<(), Error> { let streams = l.streams_by_id().len(); let env = streamer::Environment { db: &db, - clocks: clocks.clone(), opener: &*stream::FFMPEG, shutdown: &shutdown_streamers, }; diff --git a/src/mp4.rs b/src/mp4.rs index 71845f9..85cc8ef 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1518,6 +1518,7 @@ impl http_serve::Entity for File { #[cfg(test)] mod tests { use byteorder::{BigEndian, ByteOrder}; + use clock::RealClocks; use db::recording::{self, TIME_UNITS_PER_SEC}; use db::testutil::{self, TestDb, TEST_STREAM_ID}; use db::writer; @@ -1745,7 +1746,7 @@ mod tests { } } - fn copy_mp4_to_db(db: &TestDb) { + fn copy_mp4_to_db(db: &TestDb) { let mut input = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); @@ -1756,8 +1757,7 @@ mod tests { extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec).unwrap(); let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap(); - let mut output = writer::Writer::new(&::base::clock::RealClocks{}, dir, &db.db, - &db.syncer_channel, TEST_STREAM_ID, + let mut output = writer::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID, video_sample_entry_id); // end_pts is the pts of the end of the most recent frame (start + duration). @@ -1785,7 +1785,7 @@ mod tests { db.syncer_channel.flush(); } - pub fn create_mp4_from_db(tdb: &TestDb, + pub fn create_mp4_from_db(tdb: &TestDb, skip_90k: i32, shorten_90k: i32, include_subtitles: bool) -> File { let mut builder = FileBuilder::new(Type::Normal); builder.include_timestamp_subtitle_track(include_subtitles); @@ -1858,7 +1858,7 @@ mod tests { /// Makes a `.mp4` file which is only good for exercising the `Slice` logic for producing /// sample tables that match the supplied encoder. - fn make_mp4_from_encoders(type_: Type, db: &TestDb, + fn make_mp4_from_encoders(type_: Type, db: &TestDb, mut recordings: Vec, desired_range_90k: Range) -> File { let mut builder = FileBuilder::new(type_); @@ -1879,7 +1879,7 @@ mod tests { #[test] fn test_all_sync_frames() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); for i in 1..6 { @@ -1933,7 +1933,7 @@ mod tests { #[test] fn test_half_sync_frames() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); for i in 1..6 { @@ -1996,7 +1996,7 @@ mod tests { #[test] fn test_multi_segment() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let mut encoders = Vec::new(); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); @@ -2033,7 +2033,7 @@ mod tests { #[test] fn test_zero_duration_recording() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let mut encoders = Vec::new(); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); @@ -2059,7 +2059,7 @@ mod tests { #[test] fn test_media_segment() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); for i in 1..6 { @@ -2102,7 +2102,7 @@ mod tests { #[test] fn test_round_trip() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 0, 0, false); let new_filename = write_mp4(&mp4, db.tmpdir.path()); @@ -2123,7 +2123,7 @@ mod tests { #[test] fn test_round_trip_with_subtitles() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 0, 0, true); let new_filename = write_mp4(&mp4, db.tmpdir.path()); @@ -2144,7 +2144,7 @@ mod tests { #[test] fn test_round_trip_with_edit_list() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 1, 0, false); let new_filename = write_mp4(&mp4, db.tmpdir.path()); @@ -2165,7 +2165,7 @@ mod tests { #[test] fn test_round_trip_with_shorten() { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 0, 1, false); let new_filename = write_mp4(&mp4, db.tmpdir.path()); @@ -2214,7 +2214,7 @@ mod bench { impl BenchServer { fn new() -> BenchServer { - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); testutil::add_dummy_recordings_to_db(&db.db, 60); let mp4 = create_mp4_from_db(&db, 0, 0, false); let p = mp4.0.initial_sample_byte_pos; @@ -2256,7 +2256,7 @@ mod bench { #[bench] fn build_index(b: &mut Bencher) { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); testutil::add_dummy_recordings_to_db(&db.db, 1); let db = db.db.lock(); @@ -2307,7 +2307,7 @@ mod bench { #[bench] fn mp4_construction(b: &mut Bencher) { testutil::init(); - let db = TestDb::new(); + let db = TestDb::new(RealClocks {}); testutil::add_dummy_recordings_to_db(&db.db, 60); b.iter(|| { create_mp4_from_db(&db, 0, 0, false); diff --git a/src/streamer.rs b/src/streamer.rs index a532e8b..9a51abf 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -41,23 +41,21 @@ use time; pub static ROTATE_INTERVAL_SEC: i64 = 60; /// Common state that can be used by multiple `Streamer` instances. -pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { - pub clocks: Arc, +pub struct Environment<'a, 'b, C, S> where C: Clocks + Clone, S: 'a + stream::Stream { pub opener: &'a stream::Opener, - pub db: &'b Arc, + pub db: &'b Arc>, pub shutdown: &'b Arc, } -pub struct Streamer<'a, C, S> where C: Clocks, S: 'a + stream::Stream { +pub struct Streamer<'a, C, S> where C: Clocks + Clone, S: 'a + stream::Stream { shutdown: Arc, // State below is only used by the thread in Run. rotate_offset_sec: i64, rotate_interval_sec: i64, - db: Arc, + db: Arc>, dir: Arc, syncer_channel: writer::SyncerChannel<::std::fs::File>, - clocks: Arc, opener: &'a stream::Opener, stream_id: i32, short_name: String, @@ -65,7 +63,7 @@ pub struct Streamer<'a, C, S> where C: Clocks, S: 'a + stream::Stream { redacted_url: String, } -impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { +impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, dir: Arc, syncer_channel: writer::SyncerChannel<::std::fs::File>, stream_id: i32, c: &Camera, s: &Stream, rotate_offset_sec: i64, @@ -77,7 +75,6 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { db: env.db.clone(), dir, syncer_channel: syncer_channel, - clocks: env.clocks.clone(), opener: env.opener, stream_id: stream_id, short_name: format!("{}-{}", c.short_name, s.type_.as_str()), @@ -93,7 +90,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { if let Err(e) = self.run_once() { let sleep_time = time::Duration::seconds(1); warn!("{}: sleeping for {:?} after error: {:?}", self.short_name, sleep_time, e); - self.clocks.sleep(sleep_time); + self.db.clocks().sleep(sleep_time); } } info!("{}: shutting down", self.short_name); @@ -101,16 +98,17 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { fn run_once(&mut self) -> Result<(), Error> { info!("{}: Opening input: {}", self.short_name, self.redacted_url); + let clocks = self.db.clocks(); let mut stream = { - let _t = TimerGuard::new(&*self.clocks, || format!("opening {}", self.redacted_url)); + let _t = TimerGuard::new(&clocks, || format!("opening {}", self.redacted_url)); self.opener.open(stream::Source::Rtsp(&self.url))? }; - let realtime_offset = self.clocks.realtime() - self.clocks.monotonic(); + let realtime_offset = self.db.clocks().realtime() - clocks.monotonic(); // TODO: verify width/height. let extra_data = stream.get_extra_data()?; let video_sample_entry_id = { - let _t = TimerGuard::new(&*self.clocks, || "inserting video sample entry"); + let _t = TimerGuard::new(&clocks, || "inserting video sample entry"); self.db.lock().insert_video_sample_entry(extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec)? @@ -121,11 +119,11 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // Seconds since epoch at which to next rotate. let mut rotate: Option = None; let mut transformed = Vec::new(); - let mut w = writer::Writer::new(&*self.clocks, &self.dir, &self.db, &self.syncer_channel, - self.stream_id, video_sample_entry_id); + let mut w = writer::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id, + video_sample_entry_id); while !self.shutdown.load(Ordering::SeqCst) { let pkt = { - let _t = TimerGuard::new(&*self.clocks, || "getting next packet"); + let _t = TimerGuard::new(&clocks, || "getting next packet"); stream.get_next()? }; let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?; @@ -135,12 +133,12 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { debug!("{}: have first key frame", self.short_name); seen_key_frame = true; } - let frame_realtime = self.clocks.monotonic() + realtime_offset; + let frame_realtime = clocks.monotonic() + realtime_offset; let local_time = recording::Time::new(frame_realtime); rotate = if let Some(r) = rotate { if frame_realtime.sec > r && pkt.is_key() { trace!("{}: write on normal rotation", self.short_name); - let _t = TimerGuard::new(&*self.clocks, || "closing writer"); + let _t = TimerGuard::new(&clocks, || "closing writer"); w.close(Some(pts)); None } else { @@ -159,7 +157,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // usual. This ensures there's plenty of frame times to use when calculating // the start time. let r = r + if w.previously_opened()? { 0 } else { self.rotate_interval_sec }; - let _t = TimerGuard::new(&*self.clocks, || "creating writer"); + let _t = TimerGuard::new(&clocks, || "creating writer"); r }, }; @@ -173,13 +171,13 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { } else { orig_data }; - let _t = TimerGuard::new(&*self.clocks, + let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", transformed_data.len())); w.write(transformed_data, local_time, pts, pkt.is_key())?; rotate = Some(r); } if rotate.is_some() { - let _t = TimerGuard::new(&*self.clocks, || "closing writer"); + let _t = TimerGuard::new(&clocks, || "closing writer"); w.close(None); } Ok(()) @@ -322,7 +320,7 @@ mod tests { fn basic() { testutil::init(); // 2015-04-25 00:00:00 UTC - let clocks = Arc::new(clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0))); + let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); @@ -335,9 +333,8 @@ mod tests { streams: Mutex::new(vec![stream]), shutdown: Arc::new(AtomicBool::new(false)), }; - let db = testutil::TestDb::new(); - let env = super::Environment{ - clocks: Arc::clone(&clocks), + let db = testutil::TestDb::new(clocks.clone()); + let env = super::Environment { opener: &opener, db: &db.db, shutdown: &opener.shutdown,