save timestamps along with opens

This commit is contained in:
Scott Lamb 2018-03-09 17:41:53 -08:00
parent f81d699c8c
commit 4c8daa6d24
9 changed files with 105 additions and 54 deletions

View File

@ -39,7 +39,7 @@ use std::thread;
use time::{Duration, Timespec}; use time::{Duration, Timespec};
/// Abstract interface to the system clocks. This is for testability. /// Abstract interface to the system clocks. This is for testability.
pub trait Clocks : Clone + Sync + 'static { pub trait Clocks : Send + Sync + 'static {
/// Gets the current time from `CLOCK_REALTIME`. /// Gets the current time from `CLOCK_REALTIME`.
fn realtime(&self) -> Timespec; fn realtime(&self) -> Timespec;
@ -48,17 +48,17 @@ pub trait Clocks : Clone + Sync + 'static {
/// Causes the current thread to sleep for the specified time. /// Causes the current thread to sleep for the specified time.
fn sleep(&self, how_long: Duration); fn sleep(&self, how_long: Duration);
}
fn retry_forever<T, E: Into<Error>>(&self, f: &mut FnMut() -> Result<T, E>) -> T { pub fn retry_forever<T, E: Into<Error>>(clocks: &Clocks, f: &mut FnMut() -> Result<T, E>) -> T {
loop { loop {
let e = match f() { let e = match f() {
Ok(t) => return t, Ok(t) => return t,
Err(e) => e.into(), Err(e) => e.into(),
}; };
let sleep_time = Duration::seconds(1); let sleep_time = Duration::seconds(1);
warn!("sleeping for {:?} after error: {:?}", sleep_time, e); warn!("sleeping for {:?} after error: {:?}", sleep_time, e);
self.sleep(sleep_time); clocks.sleep(sleep_time);
}
} }
} }
@ -89,13 +89,13 @@ impl Clocks for RealClocks {
/// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied /// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied
/// function. /// function.
pub struct TimerGuard<'a, C: Clocks, S: AsRef<str>, F: FnOnce() -> S + 'a> { pub struct TimerGuard<'a, C: Clocks + ?Sized, S: AsRef<str>, F: FnOnce() -> S + 'a> {
clocks: &'a C, clocks: &'a C,
label_f: Option<F>, label_f: Option<F>,
start: Timespec, start: Timespec,
} }
impl<'a, C: Clocks, S: AsRef<str>, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> { impl<'a, C: Clocks + ?Sized, S: AsRef<str>, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> {
pub fn new(clocks: &'a C, label_f: F) -> Self { pub fn new(clocks: &'a C, label_f: F) -> Self {
TimerGuard { TimerGuard {
clocks, clocks,
@ -105,7 +105,8 @@ impl<'a, C: Clocks, S: AsRef<str>, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F
} }
} }
impl<'a, C: Clocks, S: AsRef<str>, F: FnOnce() -> S + 'a> Drop for TimerGuard<'a, C, S, F> { impl<'a, C, S, F> Drop for TimerGuard<'a, C, S, F>
where C: Clocks + ?Sized, S: AsRef<str>, F: FnOnce() -> S + 'a {
fn drop(&mut self) { fn drop(&mut self) {
let elapsed = self.clocks.monotonic() - self.start; let elapsed = self.clocks.monotonic() - self.start;
if elapsed.num_seconds() >= 1 { if elapsed.num_seconds() >= 1 {

View File

@ -52,6 +52,7 @@
//! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write //! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write
//! cycles. //! cycles.
use base::clock::Clocks;
use dir; use dir;
use failure::Error; use failure::Error;
use fnv::{self, FnvHashMap, FnvHashSet}; use fnv::{self, FnvHashMap, FnvHashSet};
@ -564,11 +565,17 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam
} }
pub struct LockedDatabase { pub struct LockedDatabase {
clocks: Arc<Clocks>,
conn: rusqlite::Connection, conn: rusqlite::Connection,
uuid: Uuid, uuid: Uuid,
/// If the database is open in read-write mode, the information about the current Open row. /// If the database is open in read-write mode, the information about the current Open row.
open: Option<Open>, open: Option<Open>,
/// The monotonic time when the database was opened (whether in read-write mode or read-only
/// mode).
open_monotonic: recording::Time,
sample_file_dirs_by_id: BTreeMap<i32, SampleFileDir>, sample_file_dirs_by_id: BTreeMap<i32, SampleFileDir>,
cameras_by_id: BTreeMap<i32, Camera>, cameras_by_id: BTreeMap<i32, Camera>,
streams_by_id: BTreeMap<i32, Stream>, streams_by_id: BTreeMap<i32, Stream>,
@ -862,6 +869,18 @@ impl LockedDatabase {
for (&stream_id, mut r) in &mut new_ranges { for (&stream_id, mut r) in &mut new_ranges {
*r = raw::get_range(&tx, stream_id)?; *r = raw::get_range(&tx, stream_id)?;
} }
{
let mut stmt = tx.prepare_cached(
r"update open set duration_90k = ?, end_time_90k = ? where id = ?")?;
let rows = stmt.execute(&[
&(recording::Time::new(self.clocks.monotonic()) - self.open_monotonic).0,
&recording::Time::new(self.clocks.realtime()).0,
&o.id,
])?;
if rows != 1 {
bail!("unable to find current open {}", o.id);
}
}
tx.commit()?; tx.commit()?;
// Process delete_garbage. // Process delete_garbage.
@ -1710,7 +1729,8 @@ impl Drop for Database {
impl Database { impl Database {
/// Creates the database from a caller-supplied SQLite connection. /// Creates the database from a caller-supplied SQLite connection.
pub fn new(conn: rusqlite::Connection, read_write: bool) -> Result<Database, Error> { pub fn new(clocks: Arc<Clocks>, conn: rusqlite::Connection,
read_write: bool) -> Result<Database, Error> {
conn.execute("pragma foreign_keys = on", &[])?; conn.execute("pragma foreign_keys = on", &[])?;
{ {
let ver = get_schema_version(&conn)?.ok_or_else(|| format_err!( let ver = get_schema_version(&conn)?.ok_or_else(|| format_err!(
@ -1735,20 +1755,24 @@ impl Database {
// Note: the meta check comes after the version check to improve the error message when // Note: the meta check comes after the version check to improve the error message when
// trying to open a version 0 or version 1 database (which lacked the meta table). // trying to open a version 0 or version 1 database (which lacked the meta table).
let uuid = raw::get_db_uuid(&conn)?; let uuid = raw::get_db_uuid(&conn)?;
let open_monotonic = recording::Time::new(clocks.monotonic());
let open = if read_write { let open = if read_write {
let mut stmt = conn.prepare(" insert into open (uuid) values (?)")?; let real = recording::Time::new(clocks.realtime());
let mut stmt = conn.prepare(" insert into open (uuid, start_time_90k) values (?, ?)")?;
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..]; let uuid_bytes = &uuid.as_bytes()[..];
stmt.execute(&[&uuid_bytes])?; stmt.execute(&[&uuid_bytes, &real.0])?;
Some(Open { Some(Open {
id: conn.last_insert_rowid() as u32, id: conn.last_insert_rowid() as u32,
uuid, uuid,
}) })
} else { None }; } else { None };
let db = Database(Some(Mutex::new(LockedDatabase { let db = Database(Some(Mutex::new(LockedDatabase {
conn: conn, clocks,
conn,
uuid, uuid,
open, open,
open_monotonic,
sample_file_dirs_by_id: BTreeMap::new(), sample_file_dirs_by_id: BTreeMap::new(),
cameras_by_id: BTreeMap::new(), cameras_by_id: BTreeMap::new(),
cameras_by_uuid: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(),
@ -1789,7 +1813,9 @@ impl Database {
/// Locks the database; the returned reference is the only way to perform (read or write) /// Locks the database; the returned reference is the only way to perform (read or write)
/// operations. /// operations.
pub fn lock(&self) -> MutexGuard<LockedDatabase> { self.0.as_ref().unwrap().lock() } pub fn lock(&self) -> MutexGuard<LockedDatabase> {
self.0.as_ref().unwrap().lock()
}
/// For testing: closes the database (without flushing) and returns the connection. /// For testing: closes the database (without flushing) and returns the connection.
/// This allows verification that a newly opened database is in an acceptable state. /// This allows verification that a newly opened database is in an acceptable state.
@ -1803,6 +1829,7 @@ impl Database {
mod tests { mod tests {
extern crate tempdir; extern crate tempdir;
use base::clock;
use recording::{self, TIME_UNITS_PER_SEC}; use recording::{self, TIME_UNITS_PER_SEC};
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::BTreeMap; use std::collections::BTreeMap;
@ -1968,7 +1995,8 @@ mod tests {
#[test] #[test]
fn test_no_meta_or_version() { fn test_no_meta_or_version() {
testutil::init(); testutil::init();
let e = Database::new(Connection::open_in_memory().unwrap(), false).err().unwrap(); let e = Database::new(Arc::new(clock::RealClocks{}), Connection::open_in_memory().unwrap(),
false).err().unwrap();
assert!(e.to_string().starts_with("no such table"), "{}", e); assert!(e.to_string().starts_with("no such table"), "{}", e);
} }
@ -1977,7 +2005,7 @@ mod tests {
testutil::init(); testutil::init();
let c = setup_conn(); let c = setup_conn();
c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap();
let e = Database::new(c, false).err().unwrap(); let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap();
assert!(e.to_string().starts_with( assert!(e.to_string().starts_with(
"Database schema version 2 is too old (expected 3)"), "got: {:?}", e); "Database schema version 2 is too old (expected 3)"), "got: {:?}", e);
} }
@ -1987,7 +2015,7 @@ mod tests {
testutil::init(); testutil::init();
let c = setup_conn(); let c = setup_conn();
c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap(); c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap();
let e = Database::new(c, false).err().unwrap(); let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap();
assert!(e.to_string().starts_with( assert!(e.to_string().starts_with(
"Database schema version 4 is too new (expected 3)"), "got: {:?}", e); "Database schema version 4 is too new (expected 3)"), "got: {:?}", e);
} }
@ -1997,7 +2025,7 @@ mod tests {
fn test_fresh_db() { fn test_fresh_db() {
testutil::init(); testutil::init();
let conn = setup_conn(); let conn = setup_conn();
let db = Database::new(conn, true).unwrap(); let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
let db = db.lock(); let db = db.lock();
assert_eq!(0, db.cameras_by_id().values().count()); assert_eq!(0, db.cameras_by_id().values().count());
} }
@ -2007,7 +2035,7 @@ mod tests {
fn test_full_lifecycle() { fn test_full_lifecycle() {
testutil::init(); testutil::init();
let conn = setup_conn(); let conn = setup_conn();
let db = Database::new(conn, true).unwrap(); let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap();
let path = tmpdir.path().to_str().unwrap().to_owned(); let path = tmpdir.path().to_str().unwrap().to_owned();
let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap(); let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap();
@ -2063,7 +2091,7 @@ mod tests {
// Closing and reopening the database should present the same contents. // Closing and reopening the database should present the same contents.
let conn = db.close(); let conn = db.close();
let db = Database::new(conn, true).unwrap(); let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2); assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2);
assert_no_recordings(&db, camera_uuid); assert_no_recordings(&db, camera_uuid);
@ -2104,7 +2132,7 @@ mod tests {
// Queries on a fresh database should return the correct result (with caches populated from // Queries on a fresh database should return the correct result (with caches populated from
// existing database contents rather than built on insert). // existing database contents rather than built on insert).
let conn = db.close(); let conn = db.close();
let db = Database::new(conn, true).unwrap(); let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
assert_single_recording(&db, main_stream_id, &recording); assert_single_recording(&db, main_stream_id, &recording);
// Deleting a recording should succeed, update the min/max times, and mark it as garbage. // Deleting a recording should succeed, update the min/max times, and mark it as garbage.

View File

@ -52,11 +52,25 @@ create table version (
-- Tracks every time the database has been opened in read/write mode. -- Tracks every time the database has been opened in read/write mode.
-- This is used to ensure directories are in sync with the database (see -- This is used to ensure directories are in sync with the database (see
-- schema.proto:DirMeta). It may be used in the API for etags and such in the -- schema.proto:DirMeta), to disambiguate uncommitted recordings, and
-- future. -- potentially to understand time problems.
create table open ( create table open (
id integer primary key, id integer primary key,
uuid blob unique not null check (length(uuid) = 16) uuid blob unique not null check (length(uuid) = 16),
-- Information about when / how long the database was open. These may be all
-- null, for example in the open that represents all information written
-- prior to database version 3.
-- System time when the database was opened.
start_time_90k integer,
-- System time when the database was closed or (on crash) last flushed.
end_time_90k integer,
-- How long the database was open. This is end_time_90k - start_time_90k if
-- there were no time steps during this time.
duration_90k integer
); );
create table sample_file_dir ( create table sample_file_dir (

View File

@ -28,6 +28,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
use base::clock;
use db; use db;
use dir; use dir;
use fnv::FnvHashMap; use fnv::FnvHashMap;
@ -77,9 +78,10 @@ impl TestDb {
pub fn new() -> TestDb { pub fn new() -> TestDb {
let tmpdir = TempDir::new("moonfire-nvr-test").unwrap(); let tmpdir = TempDir::new("moonfire-nvr-test").unwrap();
let clocks = Arc::new(clock::RealClocks{});
let mut conn = rusqlite::Connection::open_in_memory().unwrap(); let mut conn = rusqlite::Connection::open_in_memory().unwrap();
db::Database::init(&mut conn).unwrap(); db::Database::init(&mut conn).unwrap();
let db = Arc::new(db::Database::new(conn, true).unwrap()); let db = Arc::new(db::Database::new(clocks, conn, true).unwrap());
let (test_camera_uuid, sample_file_dir_id); let (test_camera_uuid, sample_file_dir_id);
let path = tmpdir.path().to_str().unwrap().to_owned(); let path = tmpdir.path().to_str().unwrap().to_owned();
let dir; let dir;

View File

@ -56,7 +56,10 @@ pub fn run(args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
); );
create table open ( create table open (
id integer primary key, id integer primary key,
uuid blob unique not null check (length(uuid) = 16) uuid blob unique not null check (length(uuid) = 16),
start_time_90k integer,
end_time_90k integer,
duration_90k integer
); );
create table sample_file_dir ( create table sample_file_dir (
id integer primary key, id integer primary key,

View File

@ -355,7 +355,7 @@ impl<C: Clocks, D: DirWriter> Syncer<C, D> {
} }
let c = &self.clocks; let c = &self.clocks;
for &id in &garbage { for &id in &garbage {
c.retry_forever(&mut || { clock::retry_forever(c, &mut || {
if let Err(e) = self.dir.unlink_file(id) { if let Err(e) = self.dir.unlink_file(id) {
if e.kind() == io::ErrorKind::NotFound { if e.kind() == io::ErrorKind::NotFound {
warn!("dir: recording {} already deleted!", id); warn!("dir: recording {} already deleted!", id);
@ -366,8 +366,8 @@ impl<C: Clocks, D: DirWriter> Syncer<C, D> {
Ok(()) Ok(())
}); });
} }
c.retry_forever(&mut || self.dir.sync()); clock::retry_forever(c, &mut || self.dir.sync());
c.retry_forever(&mut || self.db.lock().delete_garbage(self.dir_id, &mut garbage)); clock::retry_forever(c, &mut || self.db.lock().delete_garbage(self.dir_id, &mut garbage));
} }
/// Saves the given recording and causes rotation to happen. Called from worker thread. /// Saves the given recording and causes rotation to happen. Called from worker thread.
@ -380,8 +380,8 @@ impl<C: Clocks, D: DirWriter> Syncer<C, D> {
let stream_id = id.stream(); let stream_id = id.stream();
// Free up a like number of bytes. // Free up a like number of bytes.
self.clocks.retry_forever(&mut || f.sync_all()); clock::retry_forever(&self.clocks, &mut || f.sync_all());
self.clocks.retry_forever(&mut || self.dir.sync()); clock::retry_forever(&self.clocks, &mut || self.dir.sync());
let mut db = self.db.lock(); let mut db = self.db.lock();
db.mark_synced(id).unwrap(); db.mark_synced(id).unwrap();
delete_recordings(&mut db, stream_id, 0).unwrap(); delete_recordings(&mut db, stream_id, 0).unwrap();
@ -553,7 +553,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> {
flags: db::RecordingFlags::Growing as i32, flags: db::RecordingFlags::Growing as i32,
..Default::default() ..Default::default()
})?; })?;
let f = self.clocks.retry_forever(&mut || self.dir.create_file(id)); let f = clock::retry_forever(self.clocks, &mut || self.dir.create_file(id));
self.state = WriterState::Open(InnerWriter { self.state = WriterState::Open(InnerWriter {
f, f,
@ -601,7 +601,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> {
} }
let mut remaining = pkt; let mut remaining = pkt;
while !remaining.is_empty() { while !remaining.is_empty() {
let written = self.clocks.retry_forever(&mut || w.f.write(remaining)); let written = clock::retry_forever(self.clocks, &mut || w.f.write(remaining));
remaining = &remaining[written..]; remaining = &remaining[written..];
} }
w.unflushed_sample = Some(UnflushedSample { w.unflushed_sample = Some(UnflushedSample {

View File

@ -37,6 +37,7 @@ extern crate cursive;
use self::cursive::Cursive; use self::cursive::Cursive;
use self::cursive::views; use self::cursive::views;
use clock;
use db; use db;
use failure::Error; use failure::Error;
use regex::Regex; use regex::Regex;
@ -124,7 +125,8 @@ struct Args {
pub fn run() -> Result<(), Error> { pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?; let args: Args = super::parse_args(USAGE)?;
let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?; let (_db_dir, conn) = super::open_conn(&args.flag_db_dir, super::OpenMode::ReadWrite)?;
let db = Arc::new(db::Database::new(conn, true)?); let clocks = Arc::new(clock::RealClocks{});
let db = Arc::new(db::Database::new(clocks, conn, true)?);
let mut siv = Cursive::new(); let mut siv = Cursive::new();
//siv.add_global_callback('q', |s| s.quit()); //siv.add_global_callback('q', |s| s.quit());

View File

@ -96,10 +96,11 @@ struct Syncer {
pub fn run() -> Result<(), Error> { pub fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?; let args: Args = super::parse_args(USAGE)?;
let clocks = Arc::new(clock::RealClocks{});
let (_db_dir, conn) = super::open_conn( let (_db_dir, conn) = super::open_conn(
&args.flag_db_dir, &args.flag_db_dir,
if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?; if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?;
let db = Arc::new(db::Database::new(conn, !args.flag_read_only).unwrap()); let db = Arc::new(db::Database::new(clocks.clone(), conn, !args.flag_read_only).unwrap());
info!("Database is loaded."); info!("Database is loaded.");
{ {
@ -122,7 +123,7 @@ pub fn run() -> Result<(), Error> {
let streams = l.streams_by_id().len(); let streams = l.streams_by_id().len();
let env = streamer::Environment { let env = streamer::Environment {
db: &db, db: &db,
clocks: &clock::RealClocks{}, clocks: clocks.clone(),
opener: &*stream::FFMPEG, opener: &*stream::FFMPEG,
shutdown: &shutdown_streamers, shutdown: &shutdown_streamers,
}; };

View File

@ -42,7 +42,7 @@ pub static ROTATE_INTERVAL_SEC: i64 = 60;
/// Common state that can be used by multiple `Streamer` instances. /// Common state that can be used by multiple `Streamer` instances.
pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
pub clocks: &'a C, pub clocks: Arc<C>,
pub opener: &'a stream::Opener<S>, pub opener: &'a stream::Opener<S>,
pub db: &'b Arc<Database>, pub db: &'b Arc<Database>,
pub shutdown: &'b Arc<AtomicBool>, pub shutdown: &'b Arc<AtomicBool>,
@ -57,7 +57,7 @@ pub struct Streamer<'a, C, S> where C: Clocks, S: 'a + stream::Stream {
db: Arc<Database>, db: Arc<Database>,
dir: Arc<dir::SampleFileDir>, dir: Arc<dir::SampleFileDir>,
syncer_channel: writer::SyncerChannel<::std::fs::File>, syncer_channel: writer::SyncerChannel<::std::fs::File>,
clocks: &'a C, clocks: Arc<C>,
opener: &'a stream::Opener<S>, opener: &'a stream::Opener<S>,
stream_id: i32, stream_id: i32,
short_name: String, short_name: String,
@ -77,7 +77,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
db: env.db.clone(), db: env.db.clone(),
dir, dir,
syncer_channel: syncer_channel, syncer_channel: syncer_channel,
clocks: env.clocks, clocks: env.clocks.clone(),
opener: env.opener, opener: env.opener,
stream_id: stream_id, stream_id: stream_id,
short_name: format!("{}-{}", c.short_name, s.type_.as_str()), short_name: format!("{}-{}", c.short_name, s.type_.as_str()),
@ -103,14 +103,14 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
info!("{}: Opening input: {}", self.short_name, self.redacted_url); info!("{}: Opening input: {}", self.short_name, self.redacted_url);
let mut stream = { let mut stream = {
let _t = TimerGuard::new(self.clocks, || format!("opening {}", self.redacted_url)); let _t = TimerGuard::new(&*self.clocks, || format!("opening {}", self.redacted_url));
self.opener.open(stream::Source::Rtsp(&self.url))? self.opener.open(stream::Source::Rtsp(&self.url))?
}; };
let realtime_offset = self.clocks.realtime() - self.clocks.monotonic(); let realtime_offset = self.clocks.realtime() - self.clocks.monotonic();
// TODO: verify width/height. // TODO: verify width/height.
let extra_data = stream.get_extra_data()?; let extra_data = stream.get_extra_data()?;
let video_sample_entry_id = { let video_sample_entry_id = {
let _t = TimerGuard::new(self.clocks, || "inserting video sample entry"); let _t = TimerGuard::new(&*self.clocks, || "inserting video sample entry");
self.db.lock().insert_video_sample_entry(extra_data.width, extra_data.height, self.db.lock().insert_video_sample_entry(extra_data.width, extra_data.height,
extra_data.sample_entry, extra_data.sample_entry,
extra_data.rfc6381_codec)? extra_data.rfc6381_codec)?
@ -121,11 +121,11 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
// Seconds since epoch at which to next rotate. // Seconds since epoch at which to next rotate.
let mut rotate: Option<i64> = None; let mut rotate: Option<i64> = None;
let mut transformed = Vec::new(); let mut transformed = Vec::new();
let mut w = writer::Writer::new(self.clocks, &self.dir, &self.db, &self.syncer_channel, let mut w = writer::Writer::new(&*self.clocks, &self.dir, &self.db, &self.syncer_channel,
self.stream_id, video_sample_entry_id); self.stream_id, video_sample_entry_id);
while !self.shutdown.load(Ordering::SeqCst) { while !self.shutdown.load(Ordering::SeqCst) {
let pkt = { let pkt = {
let _t = TimerGuard::new(self.clocks, || "getting next packet"); let _t = TimerGuard::new(&*self.clocks, || "getting next packet");
stream.get_next()? stream.get_next()?
}; };
let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?; let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?;
@ -140,7 +140,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
rotate = if let Some(r) = rotate { rotate = if let Some(r) = rotate {
if frame_realtime.sec > r && pkt.is_key() { if frame_realtime.sec > r && pkt.is_key() {
trace!("{}: write on normal rotation", self.short_name); trace!("{}: write on normal rotation", self.short_name);
let _t = TimerGuard::new(self.clocks, || "closing writer"); let _t = TimerGuard::new(&*self.clocks, || "closing writer");
w.close(Some(pts)); w.close(Some(pts));
None None
} else { } else {
@ -159,7 +159,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
// usual. This ensures there's plenty of frame times to use when calculating // usual. This ensures there's plenty of frame times to use when calculating
// the start time. // the start time.
let r = r + if w.previously_opened()? { 0 } else { self.rotate_interval_sec }; let r = r + if w.previously_opened()? { 0 } else { self.rotate_interval_sec };
let _t = TimerGuard::new(self.clocks, || "creating writer"); let _t = TimerGuard::new(&*self.clocks, || "creating writer");
r r
}, },
}; };
@ -173,13 +173,13 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
} else { } else {
orig_data orig_data
}; };
let _t = TimerGuard::new(self.clocks, let _t = TimerGuard::new(&*self.clocks,
|| format!("writing {} bytes", transformed_data.len())); || format!("writing {} bytes", transformed_data.len()));
w.write(transformed_data, local_time, pts, pkt.is_key())?; w.write(transformed_data, local_time, pts, pkt.is_key())?;
rotate = Some(r); rotate = Some(r);
} }
if rotate.is_some() { if rotate.is_some() {
let _t = TimerGuard::new(self.clocks, || "closing writer"); let _t = TimerGuard::new(&*self.clocks, || "closing writer");
w.close(None); w.close(None);
} }
Ok(()) Ok(())
@ -322,7 +322,7 @@ mod tests {
fn basic() { fn basic() {
testutil::init(); testutil::init();
// 2015-04-25 00:00:00 UTC // 2015-04-25 00:00:00 UTC
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); let clocks = Arc::new(clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)));
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap();
@ -337,7 +337,7 @@ mod tests {
}; };
let db = testutil::TestDb::new(); let db = testutil::TestDb::new();
let env = super::Environment{ let env = super::Environment{
clocks: &clocks, clocks: Arc::clone(&clocks),
opener: &opener, opener: &opener,
db: &db.db, db: &db.db,
shutdown: &opener.shutdown, shutdown: &opener.shutdown,