diff --git a/Cargo.lock b/Cargo.lock index 6c44322..53248dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -572,6 +572,14 @@ dependencies = [ "either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "itertools" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "itoa" version = "0.4.3" @@ -822,6 +830,7 @@ dependencies = [ "blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", "libpasta 0.1.0-rc2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -834,6 +843,7 @@ dependencies = [ "protobuf 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusqlite 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2119,6 +2129,7 @@ dependencies = [ "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" "checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d" +"checksum itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b8467d9c1cebe26feb08c640139247fac215782d35371ade9a2136ed6085358" "checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73" diff --git a/Cargo.toml b/Cargo.toml index 0e6ec85..de3007b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" # The nightly feature is used within moonfire-nvr itself to gate the # benchmarks. Also pass it along to crates that can benefit from it. -nightly = ["db/nightly", "parking_lot/nightly"] +nightly = ["db/nightly", "parking_lot/nightly", "smallvec/union"] # The bundled feature includes bundled (aka statically linked) versions of # native libraries where possible. diff --git a/db/Cargo.toml b/db/Cargo.toml index 67a0df6..610ab22 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -28,6 +28,8 @@ parking_lot = { version = "0.7", features = [] } protobuf = "2.0" regex = "1.0" rusqlite = "0.16" +smallvec = "0.6" tempdir = "0.3" time = "0.1" uuid = { version = "0.7", features = ["std", "v4"] } +itertools = "0.8.0" diff --git a/db/db.rs b/db/db.rs index 3536e5f..bf90864 100644 --- a/db/db.rs +++ b/db/db.rs @@ -60,11 +60,13 @@ use crate::recording::{self, TIME_UNITS_PER_SEC}; use crate::schema; use failure::{Error, bail, format_err}; use fnv::{FnvHashMap, FnvHashSet}; +use itertools::Itertools; use log::{error, info, trace}; use lru_cache::LruCache; use openssl::hash; use parking_lot::{Mutex,MutexGuard}; use rusqlite::types::ToSql; +use smallvec::SmallVec; use std::collections::{BTreeMap, VecDeque}; use std::cell::RefCell; use std::cmp; @@ -912,23 +914,23 @@ impl LockedDatabase { tx.commit()?; // Process delete_garbage. - let mut gced = 0; + let mut gced = SmallVec::<[CompositeId; 8]>::new(); for dir in self.sample_file_dirs_by_id.values_mut() { - gced += dir.garbage_unlinked.len(); - dir.garbage_unlinked.clear(); + gced.extend(dir.garbage_unlinked.drain(..)); } - let mut added = 0; - let mut deleted = 0; + let mut added = SmallVec::<[CompositeId; 8]>::new(); + let mut deleted = SmallVec::<[CompositeId; 8]>::new(); for (stream_id, new_range) in new_ranges.drain() { let s = self.streams_by_id.get_mut(&stream_id).unwrap(); let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap(); // Process delete_oldest_recordings. - deleted += s.to_delete.len(); s.sample_file_bytes -= s.bytes_to_delete; s.bytes_to_delete = 0; + deleted.reserve(s.to_delete.len()); for row in s.to_delete.drain(..) { + deleted.push(row.id); d.garbage_needs_unlink.insert(row.id); let d = recording::Duration(row.duration as i64); s.duration -= d; @@ -936,11 +938,12 @@ impl LockedDatabase { } // Process add_recordings. - s.next_recording_id += s.synced_recordings as i32; - added += s.synced_recordings; s.bytes_to_add = 0; + added.reserve(s.synced_recordings); for _ in 0..s.synced_recordings { let u = s.uncommitted.pop_front().unwrap(); + added.push(CompositeId::new(stream_id, s.next_recording_id)); + s.next_recording_id += 1; let l = u.lock(); let end = l.start + recording::Duration(l.duration_90k as i64); s.add_recording(l.start .. end, l.sample_file_bytes); @@ -951,8 +954,9 @@ impl LockedDatabase { s.range = new_range; } self.auth.post_flush(); - info!("Flush (why: {}): added {} recordings, deleted {}, marked {} files GCed.", - reason, added, deleted, gced); + info!("Flush (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.", + reason, added.len(), added.iter().join(", "), deleted.len(), + deleted.iter().join(", "), gced.len(), gced.iter().join(", ")); for cb in &self.on_flush { cb(); } diff --git a/db/writer.rs b/db/writer.rs index c23accc..428e195 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -41,6 +41,7 @@ use fnv::FnvHashMap; use parking_lot::Mutex; use log::{debug, info, trace, warn}; use openssl::hash; +use std::cmp::Ordering; use std::cmp; use std::io; use std::mem; @@ -104,14 +105,47 @@ struct Syncer { dir_id: i32, dir: D, db: Arc>, - - /// Information about the next scheduled flush: - /// * monotonic time - /// * reason (for logging) - /// * senders to drop when this time is reached (for testing; see SyncerChannel::flush). - next_flush: Option<(Timespec, String, Vec>)>, + planned_flushes: std::collections::BinaryHeap, } +struct PlannedFlush { + /// Monotonic time at which this flush should happen. + when: Timespec, + + /// Recording which prompts this flush. If this recording is already flushed at the planned + /// time, it can be skipped. + recording: CompositeId, + + /// A human-readable reason for the flush, for logs. + reason: String, + + /// Senders to drop when this time is reached. This is for test instrumentation; see + /// `SyncerChannel::flush`. + senders: Vec>, +} + +// PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This +// PlannedFlush is greater than other if its when is _less_ than the other's. +impl Ord for PlannedFlush { + fn cmp(&self, other: &Self) -> Ordering { + other.when.cmp(&self.when) + } +} + +impl PartialOrd for PlannedFlush { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for PlannedFlush { + fn eq(&self, other: &Self) -> bool { + self.when == other.when + } +} + +impl Eq for PlannedFlush {} + /// Starts a syncer for the given sample file directory. /// /// The lock must not be held on `db` when this is called. @@ -218,7 +252,7 @@ impl SyncerChannel { } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete, - /// including a scheduled database flush if any. Note this doesn't wait for any + /// including the next scheduled database flush (if any). Note this doesn't wait for any /// post-database flush garbage collection. pub fn flush(&self) { let (snd, rcv) = mpsc::sync_channel(0); @@ -290,7 +324,7 @@ impl Syncer> { dir_id, dir, db, - next_flush: None, + planned_flushes: std::collections::BinaryHeap::new(), }, d.path.clone())) } @@ -345,18 +379,14 @@ impl Syncer> { impl Syncer { fn run(&mut self, cmds: mpsc::Receiver>) { loop { - // Wait for a command, the next_flush timeout (if specified), or channel disconnect. - let next_flush = self.next_flush.take(); + // Wait for a command, the next flush timeout (if specified), or channel disconnect. + let next_flush = self.planned_flushes.peek().map(|f| f.when); let cmd = match next_flush { None => match cmds.recv() { Err(_) => return, // all cmd senders are gone. Ok(cmd) => cmd, }, - Some((t, r, flush_senders)) => { - // Note: `flush_senders` will be dropped on exit from this block if left - // unmoved, which has the desired behavior of closing the channels and - // notifying the receivers the flush occurred. - + Some(t) => { let now = self.db.clocks().monotonic(); // Calculate the timeout to use, mapping negative durations to 0. @@ -364,13 +394,10 @@ impl Syncer { match cmds.recv_timeout(timeout) { Err(mpsc::RecvTimeoutError::Disconnected) => return, // cmd senders gone. Err(mpsc::RecvTimeoutError::Timeout) => { - self.flush(&r); + self.flush(); continue }, - Ok(cmd) => { - self.next_flush = Some((t, r, flush_senders)); - cmd - }, + Ok(cmd) => cmd, } }, }; @@ -382,8 +409,8 @@ impl Syncer { SyncerCommand::Flush(flush) => { // The sender is waiting for the supplied writer to be dropped. If there's no // timeout, do so immediately; otherwise wait for that timeout then drop it. - if let Some((_, _, ref mut flushes)) = self.next_flush { - flushes.push(flush); + if let Some(mut f) = self.planned_flushes.peek_mut() { + f.senders.push(flush); } }, }; @@ -438,27 +465,65 @@ impl Syncer { // Schedule a flush. let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration(); let now = self.db.clocks().monotonic(); - let t = now + how_soon; - if let Some((nft, ref r, _)) = self.next_flush { - if nft <= t { - trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}", - c.short_name, s.type_.as_str(), how_soon, nft - now, &r); - return; - } - } - let reason = format!("{} sec after start of {} {}-{} recording", - s.flush_if_sec, duration, c.short_name, s.type_.as_str()); + let when = now + how_soon; + let reason = format!("{} sec after start of {} {}-{} recording {}", + s.flush_if_sec, duration, c.short_name, s.type_.as_str(), id); trace!("scheduling flush in {} because {}", how_soon, &reason); - self.next_flush = Some((t, reason, Vec::new())); + self.planned_flushes.push(PlannedFlush { + when, + reason, + recording: id, + senders: Vec::new(), + }); } - fn flush(&mut self, reason: &str) { - if let Err(e) = self.db.lock().flush(reason) { - let d = Duration::minutes(1); - warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e); - let t = self.db.clocks().monotonic() + Duration::minutes(1); - self.next_flush = Some((t, "retry after flush failure".to_owned(), Vec::new())); + fn flush(&mut self) { + let mut l = self.db.lock(); + + // Look through the planned flushes and see if any are still relevant. It's possible + // they're not because something else (e.g., a syncer for a different sample file dir) + // has flushed the database in the meantime. + use std::collections::binary_heap::PeekMut; + while let Some(f) = self.planned_flushes.peek_mut() { + let s = match l.streams_by_id().get(&f.recording.stream()) { + Some(s) => s, + None => { + // Removing streams while running hasn't been implemented yet, so this should + // be impossible. + warn!("bug: no stream for {} which was scheduled to be flushed", f.recording); + PeekMut::pop(f); + continue; + } + }; + + if s.next_recording_id <= f.recording.recording() { // not yet committed. + break; + } + + trace!("planned flush ({}) no longer needed", &f.reason); + PeekMut::pop(f); } + + // If there's anything left to do now, try to flush. + let f = match self.planned_flushes.peek() { + None => return, + Some(f) => f, + }; + let now = self.db.clocks().monotonic(); + if f.when > now { + return; + } + if let Err(e) = l.flush(&f.reason) { + let d = Duration::minutes(1); + warn!("flush failure on save for reason {}; will retry after {}: {:?}", + f.reason, d, e); + self.planned_flushes.peek_mut().expect("planned_flushes is non-empty").when = + self.db.clocks().monotonic() + Duration::minutes(1); + return; + } + + // A successful flush should take care of everything planned. + self.planned_flushes.clear(); } } @@ -861,7 +926,7 @@ mod tests { dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(), dir: dir.clone(), db: tdb.db.clone(), - next_flush: None, + planned_flushes: std::collections::BinaryHeap::new(), }; let (snd, rcv) = mpsc::channel(); tdb.db.lock().on_flush(Box::new({ @@ -875,7 +940,6 @@ mod tests { .spawn(move || syncer.run(rcv)).unwrap(); Harness { - //clocks, dir_id, dir, db: tdb.db, @@ -941,8 +1005,6 @@ mod tests { gc_done_snd.send(()).unwrap(); Ok(()) }))); - //h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); - //h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w);