mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-11-20 09:56:07 -05:00
better logs during normal operation
* don't log every time we delete stuff; leave it for the flush * when flushing, break apart counts by sample file dir and include human-readable sizes
This commit is contained in:
55
db/db.rs
55
db/db.rs
@@ -53,6 +53,7 @@
|
||||
//! cycles.
|
||||
|
||||
use base::clock::{self, Clocks};
|
||||
use base::strutil::encode_size;
|
||||
use crate::auth;
|
||||
use crate::dir;
|
||||
use crate::raw;
|
||||
@@ -72,6 +73,7 @@ use smallvec::SmallVec;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::cell::RefCell;
|
||||
use std::cmp;
|
||||
use std::fmt::Write as _;
|
||||
use std::io::Write;
|
||||
use std::ops::Range;
|
||||
use std::mem;
|
||||
@@ -555,7 +557,7 @@ fn adjust_days(r: Range<recording::Time>, sign: i64,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
/// Adds a single recording with the given properties to the in-memory state.
|
||||
/// Adds a single fully committed recording with the given properties to the in-memory state.
|
||||
fn add_recording(&mut self, r: Range<recording::Time>, sample_file_bytes: i32) {
|
||||
self.range = Some(match self.range {
|
||||
Some(ref e) => cmp::min(e.start, r.start) .. cmp::max(e.end, r.end),
|
||||
@@ -966,24 +968,36 @@ impl LockedDatabase {
|
||||
self.signal.flush(&tx)?;
|
||||
tx.commit()?;
|
||||
|
||||
#[derive(Default)]
|
||||
struct DirLog {
|
||||
added: SmallVec::<[CompositeId; 32]>,
|
||||
deleted: SmallVec::<[CompositeId; 32]>,
|
||||
gced: SmallVec::<[CompositeId; 32]>,
|
||||
added_bytes: i64,
|
||||
deleted_bytes: i64,
|
||||
}
|
||||
let mut dir_logs: FnvHashMap<i32, DirLog> = FnvHashMap::default();
|
||||
|
||||
// Process delete_garbage.
|
||||
let mut gced = SmallVec::<[CompositeId; 8]>::new();
|
||||
for dir in self.sample_file_dirs_by_id.values_mut() {
|
||||
gced.extend(dir.garbage_unlinked.drain(..));
|
||||
for (&id, dir) in &mut self.sample_file_dirs_by_id {
|
||||
if !dir.garbage_unlinked.is_empty() {
|
||||
dir_logs.entry(id).or_default().gced.extend(dir.garbage_unlinked.drain(..));
|
||||
}
|
||||
}
|
||||
|
||||
let mut added = SmallVec::<[CompositeId; 8]>::new();
|
||||
let mut deleted = SmallVec::<[CompositeId; 8]>::new();
|
||||
for (stream_id, new_range) in new_ranges.drain() {
|
||||
let s = self.streams_by_id.get_mut(&stream_id).unwrap();
|
||||
let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap();
|
||||
let dir_id = s.sample_file_dir_id.unwrap();
|
||||
let d = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap();
|
||||
let log = dir_logs.entry(dir_id).or_default();
|
||||
|
||||
// Process delete_oldest_recordings.
|
||||
s.sample_file_bytes -= s.bytes_to_delete;
|
||||
log.deleted_bytes += s.bytes_to_delete;
|
||||
s.bytes_to_delete = 0;
|
||||
deleted.reserve(s.to_delete.len());
|
||||
log.deleted.reserve(s.to_delete.len());
|
||||
for row in s.to_delete.drain(..) {
|
||||
deleted.push(row.id);
|
||||
log.deleted.push(row.id);
|
||||
d.garbage_needs_unlink.insert(row.id);
|
||||
let d = recording::Duration(row.duration as i64);
|
||||
s.duration -= d;
|
||||
@@ -991,11 +1005,12 @@ impl LockedDatabase {
|
||||
}
|
||||
|
||||
// Process add_recordings.
|
||||
log.added_bytes += s.bytes_to_add;
|
||||
s.bytes_to_add = 0;
|
||||
added.reserve(s.synced_recordings);
|
||||
log.added.reserve(s.synced_recordings);
|
||||
for _ in 0..s.synced_recordings {
|
||||
let u = s.uncommitted.pop_front().unwrap();
|
||||
added.push(CompositeId::new(stream_id, s.next_recording_id));
|
||||
log.added.push(CompositeId::new(stream_id, s.next_recording_id));
|
||||
s.next_recording_id += 1;
|
||||
let l = u.lock();
|
||||
let end = l.start + recording::Duration(l.duration_90k as i64);
|
||||
@@ -1009,9 +1024,21 @@ impl LockedDatabase {
|
||||
self.auth.post_flush();
|
||||
self.signal.post_flush();
|
||||
self.flush_count += 1;
|
||||
info!("Flush {} (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.",
|
||||
self.flush_count, reason, added.len(), added.iter().join(", "), deleted.len(),
|
||||
deleted.iter().join(", "), gced.len(), gced.iter().join(", "));
|
||||
let mut log_msg = String::with_capacity(256);
|
||||
for (&dir_id, log) in &dir_logs {
|
||||
let dir = self.sample_file_dirs_by_id.get(&dir_id).unwrap();
|
||||
write!(&mut log_msg,
|
||||
"\n{}: added {}B in {} recordings ({}), deleted {}B in {} ({}), \
|
||||
GCed {} recordings ({}).",
|
||||
&dir.path, &encode_size(log.added_bytes), log.added.len(),
|
||||
log.added.iter().join(", "), &encode_size(log.deleted_bytes), log.deleted.len(),
|
||||
log.deleted.iter().join(", "), log.gced.len(),
|
||||
log.gced.iter().join(", ")).unwrap();
|
||||
}
|
||||
if log_msg.is_empty() {
|
||||
log_msg.push_str(" no recording changes");
|
||||
}
|
||||
info!("Flush {} (why: {}):{}", self.flush_count, reason, &log_msg);
|
||||
for cb in &self.on_flush {
|
||||
cb();
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::recording;
|
||||
use failure::{Error, bail, format_err};
|
||||
use fnv::FnvHashMap;
|
||||
use parking_lot::Mutex;
|
||||
use log::{debug, info, trace, warn};
|
||||
use log::{debug, trace, warn};
|
||||
use openssl::hash;
|
||||
use std::cmp::Ordering;
|
||||
use std::cmp;
|
||||
@@ -205,9 +205,6 @@ pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
|
||||
}
|
||||
if l.limit >= bytes_before { continue }
|
||||
delete_recordings(db, l.stream_id, extra)?;
|
||||
let stream = db.streams_by_id().get(&l.stream_id).unwrap();
|
||||
info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before,
|
||||
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete);
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
@@ -238,8 +235,6 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
|
||||
}
|
||||
false
|
||||
})?;
|
||||
info!("{}: deleting {} bytes in {} recordings ({} bytes needed)",
|
||||
stream_id, bytes_to_delete, n, bytes_needed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user