add a TimerGuard around db locks & ops

I moved the clocks member from LockedDatabase to Database to make this happen,
so the new DatabaseGuard (replacing a direct MutexGuard<LockedDatabase>) can
access it before acquiring the lock.

I also made the type of clock a type parameter of Database (and so several
other things throughout the system). This allowed me to drop the Arc<>, but
more importantly it means that the Clocks trait doesn't need to stay
object-safe. I plan to take advantage of that shortly.
This commit is contained in:
Scott Lamb
2018-03-23 13:31:23 -07:00
parent c0da1ef880
commit addeb9d2f6
10 changed files with 209 additions and 171 deletions

166
db/db.rs
View File

@@ -52,7 +52,7 @@
//! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write
//! cycles.
use base::clock::Clocks;
use base::clock::{self, Clocks};
use dir;
use failure::Error;
use fnv::{self, FnvHashMap, FnvHashSet};
@@ -565,7 +565,6 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam
}
pub struct LockedDatabase {
clocks: Arc<Clocks>,
conn: rusqlite::Connection,
uuid: Uuid,
@@ -812,16 +811,10 @@ impl LockedDatabase {
Ok(())
}
/// Tries to flush unwritten changes from the stream directories.
/// Helper for `DatabaseGuard::flush()` and `Database::drop()`.
///
/// * commits any recordings added with `add_recording` that have since been marked as
/// synced.
/// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`.
/// * removes entries from the garbage table as requested by `mark_sample_files_deleted`.
///
/// On success, for each affected sample file directory with a flush watcher set, sends a
/// `Flush` event.
pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> {
/// The public API is in `DatabaseGuard::flush()`; it supplies the `Clocks` to this function.
fn flush<C: Clocks>(&mut self, clocks: &C, reason: &str) -> Result<(), Error> {
let o = match self.open.as_ref() {
None => bail!("database is read-only"),
Some(o) => o,
@@ -873,8 +866,8 @@ impl LockedDatabase {
let mut stmt = tx.prepare_cached(
r"update open set duration_90k = ?, end_time_90k = ? where id = ?")?;
let rows = stmt.execute(&[
&(recording::Time::new(self.clocks.monotonic()) - self.open_monotonic).0,
&recording::Time::new(self.clocks.realtime()).0,
&(recording::Time::new(clocks.monotonic()) - self.open_monotonic).0,
&recording::Time::new(clocks.realtime()).0,
&o.id,
])?;
if rows != 1 {
@@ -1692,6 +1685,21 @@ impl LockedDatabase {
}
}
/// Initializes a database.
/// Note this doesn't set journal options, so that it can be used on in-memory databases for
/// test code.
pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> {
let tx = conn.transaction()?;
tx.execute_batch(include_str!("schema.sql"))?;
{
let uuid = ::uuid::Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
tx.execute("insert into meta (uuid) values (?)", &[&uuid_bytes])?;
}
tx.commit()?;
Ok(())
}
/// Gets the schema version from the given database connection.
/// A fully initialized database will return `Ok(Some(version))` where `version` is an integer that
/// can be compared to `EXPECTED_VERSION`. An empty database will return `Ok(None)`. A partially
@@ -1709,28 +1717,37 @@ pub fn get_schema_version(conn: &rusqlite::Connection) -> Result<Option<i32>, Er
/// The recording database. Abstracts away SQLite queries. Also maintains in-memory state
/// (loaded on startup, and updated on successful commit) to avoid expensive scans over the
/// recording table on common queries.
pub struct Database(
pub struct Database<C: Clocks + Clone = clock::RealClocks> {
/// This is wrapped in an `Option` to allow the `Drop` implementation and `close` to coexist.
Option<Mutex<LockedDatabase>>
);
db: Option<Mutex<LockedDatabase>>,
impl Drop for Database {
/// This is kept separately from the `LockedDatabase` to allow the `lock()` operation itself to
/// access it. It doesn't need a `Mutex` anyway; it's `Sync`, and all operations work on
/// `&self`.
clocks: C,
}
impl<C: Clocks + Clone> Drop for Database<C> {
fn drop(&mut self) {
if ::std::thread::panicking() {
return; // don't flush while panicking.
}
if let Some(m) = self.0.take() {
if let Err(e) = m.into_inner().flush("drop") {
if let Some(m) = self.db.take() {
if let Err(e) = m.into_inner().flush(&self.clocks, "drop") {
error!("Final database flush failed: {}", e);
}
}
}
}
impl Database {
// Helpers for Database::lock(). Closures don't implement Fn.
fn acquisition() -> &'static str { "database lock acquisition" }
fn operation() -> &'static str { "database operation" }
impl<C: Clocks + Clone> Database<C> {
/// Creates the database from a caller-supplied SQLite connection.
pub fn new(clocks: Arc<Clocks>, conn: rusqlite::Connection,
read_write: bool) -> Result<Database, Error> {
pub fn new(clocks: C, conn: rusqlite::Connection,
read_write: bool) -> Result<Database<C>, Error> {
conn.execute("pragma foreign_keys = on", &[])?;
{
let ver = get_schema_version(&conn)?.ok_or_else(|| format_err!(
@@ -1767,20 +1784,22 @@ impl Database {
uuid,
})
} else { None };
let db = Database(Some(Mutex::new(LockedDatabase {
let db = Database {
db: Some(Mutex::new(LockedDatabase {
conn,
uuid,
open,
open_monotonic,
sample_file_dirs_by_id: BTreeMap::new(),
cameras_by_id: BTreeMap::new(),
cameras_by_uuid: BTreeMap::new(),
streams_by_id: BTreeMap::new(),
video_sample_entries_by_id: BTreeMap::new(),
video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
on_flush: Vec::new(),
})),
clocks,
conn,
uuid,
open,
open_monotonic,
sample_file_dirs_by_id: BTreeMap::new(),
cameras_by_id: BTreeMap::new(),
cameras_by_uuid: BTreeMap::new(),
streams_by_id: BTreeMap::new(),
video_sample_entries_by_id: BTreeMap::new(),
video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
on_flush: Vec::new(),
})));
};
{
let l = &mut *db.lock();
l.init_video_sample_entries()?;
@@ -1796,35 +1815,62 @@ impl Database {
Ok(db)
}
/// Initializes a database.
/// Note this doesn't set journal options, so that it can be used on in-memory databases for
/// test code.
pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> {
let tx = conn.transaction()?;
tx.execute_batch(include_str!("schema.sql"))?;
{
let uuid = ::uuid::Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
tx.execute("insert into meta (uuid) values (?)", &[&uuid_bytes])?;
}
tx.commit()?;
Ok(())
}
#[inline(always)]
pub fn clocks(&self) -> C { self.clocks.clone() }
/// Locks the database; the returned reference is the only way to perform (read or write)
/// operations.
pub fn lock(&self) -> MutexGuard<LockedDatabase> {
self.0.as_ref().unwrap().lock()
pub fn lock(&self) -> DatabaseGuard<C> {
let timer = clock::TimerGuard::new(&self.clocks, acquisition);
let db = self.db.as_ref().unwrap().lock();
drop(timer);
let _timer = clock::TimerGuard::<C, &'static str, fn() -> &'static str>::new(
&self.clocks, operation);
DatabaseGuard {
clocks: &self.clocks,
db,
_timer,
}
}
/// For testing: closes the database (without flushing) and returns the connection.
/// This allows verification that a newly opened database is in an acceptable state.
#[cfg(test)]
fn close(mut self) -> rusqlite::Connection {
self.0.take().unwrap().into_inner().conn
self.db.take().unwrap().into_inner().conn
}
}
pub struct DatabaseGuard<'db, C: Clocks> {
clocks: &'db C,
db: MutexGuard<'db, LockedDatabase>,
_timer: clock::TimerGuard<'db, C, &'static str, fn() -> &'static str>,
}
impl<'db, C: Clocks + Clone> DatabaseGuard<'db, C> {
/// Tries to flush unwritten changes from the stream directories.
///
/// * commits any recordings added with `add_recording` that have since been marked as
/// synced.
/// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`.
/// * removes entries from the garbage table as requested by `mark_sample_files_deleted`.
///
/// On success, for each affected sample file directory with a flush watcher set, sends a
/// `Flush` event.
pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> {
self.db.flush(self.clocks, reason)
}
}
impl<'db, C: Clocks + Clone> ::std::ops::Deref for DatabaseGuard<'db, C> {
type Target = LockedDatabase;
fn deref(&self) -> &LockedDatabase { &*self.db }
}
impl<'db, C: Clocks + Clone> ::std::ops::DerefMut for DatabaseGuard<'db, C> {
fn deref_mut(&mut self) -> &mut LockedDatabase { &mut *self.db }
}
#[cfg(test)]
mod tests {
extern crate tempdir;
@@ -1840,7 +1886,7 @@ mod tests {
fn setup_conn() -> Connection {
let mut conn = Connection::open_in_memory().unwrap();
Database::init(&mut conn).unwrap();
super::init(&mut conn).unwrap();
conn
}
@@ -1995,7 +2041,7 @@ mod tests {
#[test]
fn test_no_meta_or_version() {
testutil::init();
let e = Database::new(Arc::new(clock::RealClocks{}), Connection::open_in_memory().unwrap(),
let e = Database::new(clock::RealClocks {}, Connection::open_in_memory().unwrap(),
false).err().unwrap();
assert!(e.to_string().starts_with("no such table"), "{}", e);
}
@@ -2005,7 +2051,7 @@ mod tests {
testutil::init();
let c = setup_conn();
c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap();
let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap();
let e = Database::new(clock::RealClocks {}, c, false).err().unwrap();
assert!(e.to_string().starts_with(
"Database schema version 2 is too old (expected 3)"), "got: {:?}", e);
}
@@ -2015,7 +2061,7 @@ mod tests {
testutil::init();
let c = setup_conn();
c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap();
let e = Database::new(Arc::new(clock::RealClocks{}), c, false).err().unwrap();
let e = Database::new(clock::RealClocks {}, c, false).err().unwrap();
assert!(e.to_string().starts_with(
"Database schema version 4 is too new (expected 3)"), "got: {:?}", e);
}
@@ -2025,7 +2071,7 @@ mod tests {
fn test_fresh_db() {
testutil::init();
let conn = setup_conn();
let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
let db = db.lock();
assert_eq!(0, db.cameras_by_id().values().count());
}
@@ -2035,7 +2081,7 @@ mod tests {
fn test_full_lifecycle() {
testutil::init();
let conn = setup_conn();
let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap();
let path = tmpdir.path().to_str().unwrap().to_owned();
let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap();
@@ -2091,7 +2137,7 @@ mod tests {
// Closing and reopening the database should present the same contents.
let conn = db.close();
let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2);
assert_no_recordings(&db, camera_uuid);
@@ -2132,7 +2178,7 @@ mod tests {
// Queries on a fresh database should return the correct result (with caches populated from
// existing database contents rather than built on insert).
let conn = db.close();
let db = Database::new(Arc::new(clock::RealClocks{}), conn, true).unwrap();
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
assert_single_recording(&db, main_stream_id, &recording);
// Deleting a recording should succeed, update the min/max times, and mark it as garbage.

View File

@@ -498,6 +498,7 @@ impl Segment {
#[cfg(test)]
mod tests {
use base::clock::RealClocks;
use super::*;
use testutil::{self, TestDb};
@@ -643,7 +644,7 @@ mod tests {
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, true, &mut r);
}
let db = TestDb::new();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
// Time range [2, 2 + 4 + 6 + 8) means the 2nd, 3rd, 4th samples should be
// included.
@@ -662,7 +663,7 @@ mod tests {
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r);
}
let db = TestDb::new();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
// Time range [2 + 4 + 6, 2 + 4 + 6 + 8) means the 4th sample should be included.
// The 3rd also gets pulled in because it is a sync frame and the 4th is not.
@@ -678,7 +679,7 @@ mod tests {
encoder.add_sample(1, 1, true, &mut r);
encoder.add_sample(1, 2, true, &mut r);
encoder.add_sample(0, 3, true, &mut r);
let db = TestDb::new();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 1 .. 2).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[2, 3]);
@@ -691,7 +692,7 @@ mod tests {
let mut r = db::RecordingToInsert::default();
let mut encoder = SampleIndexEncoder::new();
encoder.add_sample(1, 1, true, &mut r);
let db = TestDb::new();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 0).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1]);
@@ -709,7 +710,7 @@ mod tests {
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r);
}
let db = TestDb::new();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 2+4+6+8+10).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[2, 4, 6, 8, 10]);
@@ -723,7 +724,7 @@ mod tests {
encoder.add_sample(1, 1, true, &mut r);
encoder.add_sample(1, 2, true, &mut r);
encoder.add_sample(0, 3, true, &mut r);
let db = TestDb::new();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 2).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1, 2, 3]);

View File

@@ -28,7 +28,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use base::clock;
use base::clock::Clocks;
use db;
use dir;
use fnv::FnvHashMap;
@@ -64,8 +64,8 @@ pub fn init() {
});
}
pub struct TestDb {
pub db: Arc<db::Database>,
pub struct TestDb<C: Clocks + Clone> {
pub db: Arc<db::Database<C>>,
pub dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<dir::SampleFileDir>>>,
pub syncer_channel: writer::SyncerChannel<::std::fs::File>,
pub syncer_join: thread::JoinHandle<()>,
@@ -73,14 +73,13 @@ pub struct TestDb {
pub test_camera_uuid: Uuid,
}
impl TestDb {
impl<C: Clocks + Clone> TestDb<C> {
/// Creates a test database with one camera.
pub fn new() -> TestDb {
pub fn new(clocks: C) -> Self {
let tmpdir = TempDir::new("moonfire-nvr-test").unwrap();
let clocks = Arc::new(clock::RealClocks{});
let mut conn = rusqlite::Connection::open_in_memory().unwrap();
db::Database::init(&mut conn).unwrap();
db::init(&mut conn).unwrap();
let db = Arc::new(db::Database::new(clocks, conn, true).unwrap());
let (test_camera_uuid, sample_file_dir_id);
let path = tmpdir.path().to_str().unwrap().to_owned();

View File

@@ -97,11 +97,10 @@ impl<F> ::std::clone::Clone for SyncerChannel<F> {
}
/// State of the worker thread.
struct Syncer<C: Clocks, D: DirWriter> {
clocks: C,
struct Syncer<C: Clocks + Clone, D: DirWriter> {
dir_id: i32,
dir: D,
db: Arc<db::Database>,
db: Arc<db::Database<C>>,
}
/// Starts a syncer for the given sample file directory.
@@ -121,8 +120,9 @@ struct Syncer<C: Clocks, D: DirWriter> {
/// Note that dropping all `SyncerChannel` clones currently includes calling
/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes.
/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically.
pub fn start_syncer(db: Arc<db::Database>, dir_id: i32)
-> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error> {
pub fn start_syncer<C>(db: Arc<db::Database<C>>, dir_id: i32)
-> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error>
where C: Clocks + Clone {
let db2 = db.clone();
let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?;
syncer.initial_rotation()?;
@@ -216,8 +216,30 @@ impl<F: FileWriter> SyncerChannel<F> {
}
}
impl Syncer<clock::RealClocks, Arc<dir::SampleFileDir>> {
fn new(l: &db::LockedDatabase, db: Arc<db::Database>, dir_id: i32)
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
/// on opening.
fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap<i32, i32>)
-> Result<Vec<CompositeId>, Error> {
let mut v = Vec::new();
for e in ::std::fs::read_dir(path)? {
let e = e?;
let id = match dir::parse_id(e.file_name().as_bytes()) {
Ok(i) => i,
Err(_) => continue,
};
let next = match streams_to_next.get(&id.stream()) {
Some(n) => *n,
None => continue, // unknown stream.
};
if id.recording() >= next {
v.push(id);
}
}
Ok(v)
}
impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
fn new(l: &db::LockedDatabase, db: Arc<db::Database<C>>, dir_id: i32)
-> Result<(Self, String), Error> {
let d = l.sample_file_dirs_by_id()
.get(&dir_id)
@@ -237,7 +259,7 @@ impl Syncer<clock::RealClocks, Arc<dir::SampleFileDir>> {
}
})
.collect();
let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?;
let to_abandon = list_files_to_abandon(&d.path, streams_to_next)?;
let mut undeletable = 0;
for &id in &to_abandon {
if let Err(e) = dir.unlink_file(id) {
@@ -254,35 +276,12 @@ impl Syncer<clock::RealClocks, Arc<dir::SampleFileDir>> {
}
Ok((Syncer {
clocks: clock::RealClocks{},
dir_id,
dir,
db,
}, d.path.clone()))
}
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
/// on opening.
fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap<i32, i32>)
-> Result<Vec<CompositeId>, Error> {
let mut v = Vec::new();
for e in ::std::fs::read_dir(path)? {
let e = e?;
let id = match dir::parse_id(e.file_name().as_bytes()) {
Ok(i) => i,
Err(_) => continue,
};
let next = match streams_to_next.get(&id.stream()) {
Some(n) => *n,
None => continue, // unknown stream.
};
if id.recording() >= next {
v.push(id);
}
}
Ok(v)
}
/// Rotates files for all streams and deletes stale files from previous runs.
/// Called from main thread.
fn initial_rotation(&mut self) -> Result<(), Error> {
@@ -331,7 +330,7 @@ impl Syncer<clock::RealClocks, Arc<dir::SampleFileDir>> {
}
}
impl<C: Clocks, D: DirWriter> Syncer<C, D> {
impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) {
loop {
match cmds.recv() {
@@ -353,7 +352,7 @@ impl<C: Clocks, D: DirWriter> Syncer<C, D> {
if garbage.is_empty() {
return;
}
let c = &self.clocks;
let c = &self.db.clocks();
for &id in &garbage {
clock::retry_forever(c, &mut || {
if let Err(e) = self.dir.unlink_file(id) {
@@ -380,8 +379,8 @@ impl<C: Clocks, D: DirWriter> Syncer<C, D> {
let stream_id = id.stream();
// Free up a like number of bytes.
clock::retry_forever(&self.clocks, &mut || f.sync_all());
clock::retry_forever(&self.clocks, &mut || self.dir.sync());
clock::retry_forever(&self.db.clocks(), &mut || f.sync_all());
clock::retry_forever(&self.db.clocks(), &mut || self.dir.sync());
let mut db = self.db.lock();
db.mark_synced(id).unwrap();
delete_recordings(&mut db, stream_id, 0).unwrap();
@@ -412,10 +411,9 @@ impl<C: Clocks, D: DirWriter> Syncer<C, D> {
/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It
/// saves the recording to the database (if I/O errors do not prevent this), retries forever,
/// or panics (if further writing on this stream is impossible).
pub struct Writer<'a, C: Clocks, D: DirWriter> {
clocks: &'a C,
pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> {
dir: &'a D,
db: &'a db::Database,
db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>,
stream_id: i32,
video_sample_entry_id: i32,
@@ -523,11 +521,10 @@ struct PreviousWriter {
run_offset: i32,
}
impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> {
pub fn new(clocks: &'a C, dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel<D::File>,
impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
pub fn new(dir: &'a D, db: &'a db::Database<C>, channel: &'a SyncerChannel<D::File>,
stream_id: i32, video_sample_entry_id: i32) -> Self {
Writer {
clocks,
dir,
db,
channel,
@@ -553,7 +550,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> {
flags: db::RecordingFlags::Growing as i32,
..Default::default()
})?;
let f = clock::retry_forever(self.clocks, &mut || self.dir.create_file(id));
let f = clock::retry_forever(&self.db.clocks(), &mut || self.dir.create_file(id));
self.state = WriterState::Open(InnerWriter {
f,
@@ -601,7 +598,7 @@ impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> {
}
let mut remaining = pkt;
while !remaining.is_empty() {
let written = clock::retry_forever(self.clocks, &mut || w.f.write(remaining));
let written = clock::retry_forever(&self.db.clocks(), &mut || w.f.write(remaining));
remaining = &remaining[written..];
}
w.unflushed_sample = Some(UnflushedSample {
@@ -669,7 +666,7 @@ impl<F: FileWriter> InnerWriter<F> {
}
}
impl<'a, C: Clocks, D: DirWriter> Drop for Writer<'a, C, D> {
impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> {
fn drop(&mut self) {
if ::std::thread::panicking() {
// This will probably panic again. Don't do it.
@@ -779,8 +776,8 @@ mod tests {
}
struct Harness {
clocks: SimulatedClocks,
db: Arc<db::Database>,
//clocks: SimulatedClocks,
db: Arc<db::Database<SimulatedClocks>>,
dir_id: i32,
_tmpdir: ::tempdir::TempDir,
dir: MockDir,
@@ -789,7 +786,8 @@ mod tests {
}
fn new_harness() -> Harness {
let tdb = testutil::TestDb::new();
let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0));
let tdb = testutil::TestDb::new(clocks);
let dir_id = *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap();
// This starts a real fs-backed syncer. Get rid of it.
@@ -798,10 +796,8 @@ mod tests {
tdb.syncer_join.join().unwrap();
// Start a mocker syncer.
let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0));
let dir = MockDir::new();
let mut syncer = super::Syncer {
clocks: clocks.clone(),
dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(),
dir: dir.clone(),
db: tdb.db.clone(),
@@ -818,7 +814,7 @@ mod tests {
.spawn(move || syncer.run(rcv)).unwrap();
Harness {
clocks,
//clocks,
dir_id,
dir,
db: tdb.db,
@@ -837,7 +833,7 @@ mod tests {
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
{
let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio()))));
let f = MockFile::new();
@@ -895,7 +891,7 @@ mod tests {
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
{
let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),