diff --git a/Cargo.lock b/Cargo.lock index 8050792..f375035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1053,9 +1053,11 @@ name = "moonfire-base" version = "0.0.1" dependencies = [ "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 1.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/base/Cargo.toml b/base/Cargo.toml index 8ca2997..790a6cc 100644 --- a/base/Cargo.toml +++ b/base/Cargo.toml @@ -13,7 +13,9 @@ path = "lib.rs" [dependencies] failure = "0.1.1" +lazy_static = "1.0" libc = "0.2" log = "0.4" parking_lot = { version = "0.9", features = [] } +regex = "1.0" time = "0.1" diff --git a/base/strutil.rs b/base/strutil.rs index ed4a992..776e88b 100644 --- a/base/strutil.rs +++ b/base/strutil.rs @@ -28,6 +28,66 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use lazy_static::lazy_static; +use regex::Regex; +use std::fmt::Write as _; +use std::str::FromStr as _; + +static MULTIPLIERS: [(char, u64); 4] = [ + // (suffix character, power of 2) + ('T', 40), + ('G', 30), + ('M', 20), + ('K', 10), +]; + +/// Encodes a size into human-readable form. +pub fn encode_size(mut raw: i64) -> String { + let mut encoded = String::new(); + for &(c, n) in &MULTIPLIERS { + if raw >= 1i64<> n, c).unwrap(); + raw &= (1i64 << n) - 1; + } + } + if raw > 0 || encoded.len() == 0 { + write!(&mut encoded, "{}", raw).unwrap(); + } else { + encoded.pop(); // remove trailing space. + } + encoded +} + +/// Decodes a human-readable size as output by encode_size. +pub fn decode_size(encoded: &str) -> Result { + let mut decoded = 0i64; + lazy_static! { + static ref RE: Regex = Regex::new(r"\s*([0-9]+)([TGMK])?,?\s*").unwrap(); + } + let mut last_pos = 0; + for cap in RE.captures_iter(encoded) { + let whole_cap = cap.get(0).unwrap(); + if whole_cap.start() > last_pos { + return Err(()); + } + last_pos = whole_cap.end(); + let mut piece = i64::from_str(&cap[1]).map_err(|_| ())?; + if let Some(m) = cap.get(2) { + let m = m.as_str().as_bytes()[0] as char; + for &(some_m, n) in &MULTIPLIERS { + if some_m == m { + piece *= 1i64< String { @@ -67,6 +127,11 @@ pub fn dehex(hexed: &[u8]) -> Result<[u8; 20], ()> { mod tests { use super::*; + #[test] + fn test_decode() { + assert_eq!(super::decode_size("100M").unwrap(), 100i64 << 20); + } + #[test] fn round_trip() { let s = "de382684a471f178e4e3a163762711b0653bfd83"; diff --git a/db/db.rs b/db/db.rs index 185b774..0d1fd81 100644 --- a/db/db.rs +++ b/db/db.rs @@ -53,6 +53,7 @@ //! cycles. use base::clock::{self, Clocks}; +use base::strutil::encode_size; use crate::auth; use crate::dir; use crate::raw; @@ -72,6 +73,7 @@ use smallvec::SmallVec; use std::collections::{BTreeMap, VecDeque}; use std::cell::RefCell; use std::cmp; +use std::fmt::Write as _; use std::io::Write; use std::ops::Range; use std::mem; @@ -555,7 +557,7 @@ fn adjust_days(r: Range, sign: i64, } impl Stream { - /// Adds a single recording with the given properties to the in-memory state. + /// Adds a single fully committed recording with the given properties to the in-memory state. fn add_recording(&mut self, r: Range, sample_file_bytes: i32) { self.range = Some(match self.range { Some(ref e) => cmp::min(e.start, r.start) .. cmp::max(e.end, r.end), @@ -966,24 +968,36 @@ impl LockedDatabase { self.signal.flush(&tx)?; tx.commit()?; + #[derive(Default)] + struct DirLog { + added: SmallVec::<[CompositeId; 32]>, + deleted: SmallVec::<[CompositeId; 32]>, + gced: SmallVec::<[CompositeId; 32]>, + added_bytes: i64, + deleted_bytes: i64, + } + let mut dir_logs: FnvHashMap = FnvHashMap::default(); + // Process delete_garbage. - let mut gced = SmallVec::<[CompositeId; 8]>::new(); - for dir in self.sample_file_dirs_by_id.values_mut() { - gced.extend(dir.garbage_unlinked.drain(..)); + for (&id, dir) in &mut self.sample_file_dirs_by_id { + if !dir.garbage_unlinked.is_empty() { + dir_logs.entry(id).or_default().gced.extend(dir.garbage_unlinked.drain(..)); + } } - let mut added = SmallVec::<[CompositeId; 8]>::new(); - let mut deleted = SmallVec::<[CompositeId; 8]>::new(); for (stream_id, new_range) in new_ranges.drain() { let s = self.streams_by_id.get_mut(&stream_id).unwrap(); - let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap(); + let dir_id = s.sample_file_dir_id.unwrap(); + let d = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap(); + let log = dir_logs.entry(dir_id).or_default(); // Process delete_oldest_recordings. s.sample_file_bytes -= s.bytes_to_delete; + log.deleted_bytes += s.bytes_to_delete; s.bytes_to_delete = 0; - deleted.reserve(s.to_delete.len()); + log.deleted.reserve(s.to_delete.len()); for row in s.to_delete.drain(..) { - deleted.push(row.id); + log.deleted.push(row.id); d.garbage_needs_unlink.insert(row.id); let d = recording::Duration(row.duration as i64); s.duration -= d; @@ -991,11 +1005,12 @@ impl LockedDatabase { } // Process add_recordings. + log.added_bytes += s.bytes_to_add; s.bytes_to_add = 0; - added.reserve(s.synced_recordings); + log.added.reserve(s.synced_recordings); for _ in 0..s.synced_recordings { let u = s.uncommitted.pop_front().unwrap(); - added.push(CompositeId::new(stream_id, s.next_recording_id)); + log.added.push(CompositeId::new(stream_id, s.next_recording_id)); s.next_recording_id += 1; let l = u.lock(); let end = l.start + recording::Duration(l.duration_90k as i64); @@ -1009,9 +1024,21 @@ impl LockedDatabase { self.auth.post_flush(); self.signal.post_flush(); self.flush_count += 1; - info!("Flush {} (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.", - self.flush_count, reason, added.len(), added.iter().join(", "), deleted.len(), - deleted.iter().join(", "), gced.len(), gced.iter().join(", ")); + let mut log_msg = String::with_capacity(256); + for (&dir_id, log) in &dir_logs { + let dir = self.sample_file_dirs_by_id.get(&dir_id).unwrap(); + write!(&mut log_msg, + "\n{}: added {}B in {} recordings ({}), deleted {}B in {} ({}), \ + GCed {} recordings ({}).", + &dir.path, &encode_size(log.added_bytes), log.added.len(), + log.added.iter().join(", "), &encode_size(log.deleted_bytes), log.deleted.len(), + log.deleted.iter().join(", "), log.gced.len(), + log.gced.iter().join(", ")).unwrap(); + } + if log_msg.is_empty() { + log_msg.push_str(" no recording changes"); + } + info!("Flush {} (why: {}):{}", self.flush_count, reason, &log_msg); for cb in &self.on_flush { cb(); } diff --git a/db/writer.rs b/db/writer.rs index d794219..e65529b 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -39,7 +39,7 @@ use crate::recording; use failure::{Error, bail, format_err}; use fnv::FnvHashMap; use parking_lot::Mutex; -use log::{debug, info, trace, warn}; +use log::{debug, trace, warn}; use openssl::hash; use std::cmp::Ordering; use std::cmp; @@ -205,9 +205,6 @@ pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) } if l.limit >= bytes_before { continue } delete_recordings(db, l.stream_id, extra)?; - let stream = db.streams_by_id().get(&l.stream_id).unwrap(); - info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before, - stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete); } Ok(()) }) @@ -238,8 +235,6 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, } false })?; - info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", - stream_id, bytes_to_delete, n, bytes_needed); Ok(()) } diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index 7af2898..ffaf332 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -28,6 +28,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use base::strutil::{decode_size, encode_size}; use crate::stream::{self, Opener, Stream}; use cursive::Cursive; use cursive::traits::{Boxable, Identifiable, Finder}; @@ -37,7 +38,6 @@ use failure::Error; use std::collections::BTreeMap; use std::str::FromStr; use std::sync::Arc; -use super::{decode_size, encode_size}; use url::Url; /// Builds a `CameraChange` from an active `edit_camera_dialog`. diff --git a/src/cmds/config/dirs.rs b/src/cmds/config/dirs.rs index 31a6e09..e660f7d 100644 --- a/src/cmds/config/dirs.rs +++ b/src/cmds/config/dirs.rs @@ -28,6 +28,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use base::strutil::{decode_size, encode_size}; use cursive::Cursive; use cursive::traits::{Boxable, Identifiable}; use cursive::views; @@ -38,7 +39,6 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; use std::sync::Arc; -use super::{decode_size, encode_size}; struct Stream { label: String, diff --git a/src/cmds/config/mod.rs b/src/cmds/config/mod.rs index a5d46ee..d91f224 100644 --- a/src/cmds/config/mod.rs +++ b/src/cmds/config/mod.rs @@ -38,12 +38,8 @@ use cursive::Cursive; use cursive::views; use db; use failure::Error; -use lazy_static::lazy_static; -use regex::Regex; use serde::Deserialize; use std::sync::Arc; -use std::fmt::Write; -use std::str::FromStr; mod cameras; mod dirs; @@ -64,60 +60,6 @@ Options: [default: /var/lib/moonfire-nvr/db] "#; -static MULTIPLIERS: [(char, u64); 4] = [ - // (suffix character, power of 2) - ('T', 40), - ('G', 30), - ('M', 20), - ('K', 10), -]; - -fn encode_size(mut raw: i64) -> String { - let mut encoded = String::new(); - for &(c, n) in &MULTIPLIERS { - if raw >= 1i64<> n, c).unwrap(); - raw &= (1i64 << n) - 1; - } - } - if raw > 0 || encoded.len() == 0 { - write!(&mut encoded, "{}", raw).unwrap(); - } else { - encoded.pop(); // remove trailing space. - } - encoded -} - -fn decode_size(encoded: &str) -> Result { - let mut decoded = 0i64; - lazy_static! { - static ref RE: Regex = Regex::new(r"\s*([0-9]+)([TGMK])?,?\s*").unwrap(); - } - let mut last_pos = 0; - for cap in RE.captures_iter(encoded) { - let whole_cap = cap.get(0).unwrap(); - if whole_cap.start() > last_pos { - return Err(()); - } - last_pos = whole_cap.end(); - let mut piece = i64::from_str(&cap[1]).map_err(|_| ())?; - if let Some(m) = cap.get(2) { - let m = m.as_str().as_bytes()[0] as char; - for &(some_m, n) in &MULTIPLIERS { - if some_m == m { - piece *= 1i64< Result<(), Error> { Ok(()) } - -#[cfg(test)] -mod tests { - #[test] - fn test_decode() { - assert_eq!(super::decode_size("100M").unwrap(), 100i64 << 20); - } -}