track "assumed" filesystem usage (#89)

As described in #89, we need to refactor a bit before we can get the
actual filesystem block size. Assuming 4096 for now. Small steps.
This commit is contained in:
Scott Lamb 2020-07-12 16:51:39 -07:00
parent 6b5359b7cb
commit 959defebca
7 changed files with 76 additions and 25 deletions

View File

@ -44,7 +44,7 @@ static MULTIPLIERS: [(char, u64); 4] = [
('K', 10), ('K', 10),
]; ];
/// Encodes a size into human-readable form. /// Encodes a non-negative size into human-readable form.
pub fn encode_size(mut raw: i64) -> String { pub fn encode_size(mut raw: i64) -> String {
let mut encoded = String::new(); let mut encoded = String::new();
for &(c, n) in &MULTIPLIERS { for &(c, n) in &MULTIPLIERS {

View File

@ -104,6 +104,20 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
"update stream set next_recording_id = :next_recording_id where id = :stream_id"; "update stream set next_recording_id = :next_recording_id where id = :stream_id";
/// The size of a filesystem block, to use in disk space accounting.
/// This should really be obtained by a stat call on the sample file directory in question,
/// but that requires some refactoring. See
/// [#89](https://github.com/scottlamb/moonfire-nvr/issues/89). We might be able to get away with
/// this hardcoded value for a while.
const ASSUMED_BLOCK_SIZE_BYTES: i64 = 4096;
/// Rounds a file size up to the next multiple of the block size.
/// This is useful in representing the actual amount of filesystem space used.
pub(crate) fn round_up(bytes: i64) -> i64 {
let blk = ASSUMED_BLOCK_SIZE_BYTES;
(bytes + blk - 1) / blk * blk
}
pub struct FromSqlUuid(pub Uuid); pub struct FromSqlUuid(pub Uuid);
impl rusqlite::types::FromSql for FromSqlUuid { impl rusqlite::types::FromSql for FromSqlUuid {
@ -168,8 +182,7 @@ pub struct ListAggregatedRecordingsRow {
pub growing: bool, pub growing: bool,
} }
impl ListAggregatedRecordingsRow { impl ListAggregatedRecordingsRow { fn from(row: ListRecordingsRow) -> Self {
fn from(row: ListRecordingsRow) -> Self {
let recording_id = row.id.recording(); let recording_id = row.id.recording();
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0; let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
let growing = (row.flags & RecordingFlags::Growing as i32) != 0; let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
@ -406,8 +419,15 @@ pub struct Stream {
/// The time range of recorded data associated with this stream (minimum start time and maximum /// The time range of recorded data associated with this stream (minimum start time and maximum
/// end time). `None` iff there are no recordings for this camera. /// end time). `None` iff there are no recordings for this camera.
pub range: Option<Range<recording::Time>>, pub range: Option<Range<recording::Time>>,
/// The total bytes of flushed sample files. This doesn't include disk space wasted in the
/// last filesystem block allocated to each file ("internal fragmentation").
pub sample_file_bytes: i64, pub sample_file_bytes: i64,
/// The total bytes on the filesystem used by this stream. This slightly more than
/// `sample_file_bytes` because it includes the wasted space in the last filesystem block.
pub fs_bytes: i64,
/// On flush, delete the following recordings (move them to the `garbage` table, to be /// On flush, delete the following recordings (move them to the `garbage` table, to be
/// collected later). Note they must be the oldest recordings. The later collection involves /// collected later). Note they must be the oldest recordings. The later collection involves
/// the syncer unlinking the files on disk and syncing the directory then enqueueing for /// the syncer unlinking the files on disk and syncing the directory then enqueueing for
@ -416,10 +436,12 @@ pub struct Stream {
/// The total bytes to delete with the next flush. /// The total bytes to delete with the next flush.
pub bytes_to_delete: i64, pub bytes_to_delete: i64,
pub fs_bytes_to_delete: i64,
/// The total bytes to add with the next flush. (`mark_synced` has already been called on these /// The total bytes to add with the next flush. (`mark_synced` has already been called on these
/// recordings.) /// recordings.)
pub bytes_to_add: i64, pub bytes_to_add: i64,
pub fs_bytes_to_add: i64,
/// The total duration of recorded data. This may not be `range.end - range.start` due to /// The total duration of recorded data. This may not be `range.end - range.start` due to
/// gaps and overlap. /// gaps and overlap.
@ -565,6 +587,7 @@ impl Stream {
}); });
self.duration += r.end - r.start; self.duration += r.end - r.start;
self.sample_file_bytes += sample_file_bytes as i64; self.sample_file_bytes += sample_file_bytes as i64;
self.fs_bytes += round_up(i64::from(sample_file_bytes));
adjust_days(r, 1, &mut self.days); adjust_days(r, 1, &mut self.days);
} }
} }
@ -759,9 +782,12 @@ impl StreamStateChanger {
flush_if_sec: sc.flush_if_sec, flush_if_sec: sc.flush_if_sec,
range: None, range: None,
sample_file_bytes: 0, sample_file_bytes: 0,
fs_bytes: 0,
to_delete: Vec::new(), to_delete: Vec::new(),
bytes_to_delete: 0, bytes_to_delete: 0,
fs_bytes_to_delete: 0,
bytes_to_add: 0, bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0), duration: recording::Duration(0),
days: BTreeMap::new(), days: BTreeMap::new(),
record: sc.record, record: sc.record,
@ -837,7 +863,9 @@ impl LockedDatabase {
bail!("can't sync un-added recording {}", id); bail!("can't sync un-added recording {}", id);
} }
let l = stream.uncommitted[stream.synced_recordings].lock(); let l = stream.uncommitted[stream.synced_recordings].lock();
stream.bytes_to_add += l.sample_file_bytes as i64; let bytes = i64::from(l.sample_file_bytes);
stream.bytes_to_add += bytes;
stream.fs_bytes_to_add += round_up(bytes);
stream.synced_recordings += 1; stream.synced_recordings += 1;
Ok(()) Ok(())
} }
@ -988,17 +1016,19 @@ impl LockedDatabase {
for (stream_id, new_range) in new_ranges.drain() { for (stream_id, new_range) in new_ranges.drain() {
let s = self.streams_by_id.get_mut(&stream_id).unwrap(); let s = self.streams_by_id.get_mut(&stream_id).unwrap();
let dir_id = s.sample_file_dir_id.unwrap(); let dir_id = s.sample_file_dir_id.unwrap();
let d = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap(); let dir = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap();
let log = dir_logs.entry(dir_id).or_default(); let log = dir_logs.entry(dir_id).or_default();
// Process delete_oldest_recordings. // Process delete_oldest_recordings.
s.sample_file_bytes -= s.bytes_to_delete; s.sample_file_bytes -= s.bytes_to_delete;
s.fs_bytes -= s.fs_bytes_to_delete;
log.deleted_bytes += s.bytes_to_delete; log.deleted_bytes += s.bytes_to_delete;
s.bytes_to_delete = 0; s.bytes_to_delete = 0;
s.fs_bytes_to_delete = 0;
log.deleted.reserve(s.to_delete.len()); log.deleted.reserve(s.to_delete.len());
for row in s.to_delete.drain(..) { for row in s.to_delete.drain(..) {
log.deleted.push(row.id); log.deleted.push(row.id);
d.garbage_needs_unlink.insert(row.id); dir.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(row.duration as i64); let d = recording::Duration(row.duration as i64);
s.duration -= d; s.duration -= d;
adjust_days(row.start .. row.start + d, -1, &mut s.days); adjust_days(row.start .. row.start + d, -1, &mut s.days);
@ -1007,6 +1037,7 @@ impl LockedDatabase {
// Process add_recordings. // Process add_recordings.
log.added_bytes += s.bytes_to_add; log.added_bytes += s.bytes_to_add;
s.bytes_to_add = 0; s.bytes_to_add = 0;
s.fs_bytes_to_add = 0;
log.added.reserve(s.synced_recordings); log.added.reserve(s.synced_recordings);
for _ in 0..s.synced_recordings { for _ in 0..s.synced_recordings {
let u = s.uncommitted.pop_front().unwrap(); let u = s.uncommitted.pop_front().unwrap();
@ -1312,7 +1343,7 @@ impl LockedDatabase {
Err(format_err!("no such recording {}", id)) Err(format_err!("no such recording {}", id))
} }
/// Deletes the oldest recordings that aren't already queued for deletion. /// Queues for deletion the oldest recordings that aren't already queued.
/// `f` should return true for each row that should be deleted. /// `f` should return true for each row that should be deleted.
pub(crate) fn delete_oldest_recordings( pub(crate) fn delete_oldest_recordings(
&mut self, stream_id: i32, f: &mut dyn FnMut(&ListOldestRecordingsRow) -> bool) &mut self, stream_id: i32, f: &mut dyn FnMut(&ListOldestRecordingsRow) -> bool)
@ -1328,7 +1359,9 @@ impl LockedDatabase {
raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| { raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| {
if f(&r) { if f(&r) {
s.to_delete.push(r); s.to_delete.push(r);
s.bytes_to_delete += r.sample_file_bytes as i64; let bytes = i64::from(r.sample_file_bytes);
s.bytes_to_delete += bytes;
s.fs_bytes_to_delete += round_up(bytes);
return true; return true;
} }
false false
@ -1490,9 +1523,12 @@ impl LockedDatabase {
flush_if_sec, flush_if_sec,
range: None, range: None,
sample_file_bytes: 0, sample_file_bytes: 0,
fs_bytes: 0,
to_delete: Vec::new(), to_delete: Vec::new(),
bytes_to_delete: 0, bytes_to_delete: 0,
fs_bytes_to_delete: 0,
bytes_to_add: 0, bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0), duration: recording::Duration(0),
days: BTreeMap::new(), days: BTreeMap::new(),
next_recording_id: row.get(7)?, next_recording_id: row.get(7)?,
@ -2398,4 +2434,12 @@ mod tests {
.collect(); .collect();
assert_eq!(&g, &[]); assert_eq!(&g, &[]);
} }
#[test]
fn round_up() {
assert_eq!(super::round_up(0), 0);
assert_eq!(super::round_up(8_191), 8_192);
assert_eq!(super::round_up(8_192), 8_192);
assert_eq!(super::round_up(8_193), 12_288);
}
} }

View File

@ -195,15 +195,15 @@ pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
syncer.do_rotation(|db| { syncer.do_rotation(|db| {
for l in limits { for l in limits {
let (bytes_before, extra); let (fs_bytes_before, extra);
{ {
let stream = db.streams_by_id().get(&l.stream_id) let stream = db.streams_by_id().get(&l.stream_id)
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
bytes_before = stream.sample_file_bytes + stream.bytes_to_add - fs_bytes_before = stream.fs_bytes + stream.fs_bytes_to_add -
stream.bytes_to_delete; stream.fs_bytes_to_delete;
extra = stream.retain_bytes - l.limit; extra = stream.retain_bytes - l.limit;
} }
if l.limit >= bytes_before { continue } if l.limit >= fs_bytes_before { continue }
delete_recordings(db, l.stream_id, extra)?; delete_recordings(db, l.stream_id, extra)?;
} }
Ok(()) Ok(())
@ -213,23 +213,24 @@ pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
/// Deletes recordings to bring a stream's disk usage within bounds. /// Deletes recordings to bring a stream's disk usage within bounds.
fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
extra_bytes_needed: i64) -> Result<(), Error> { extra_bytes_needed: i64) -> Result<(), Error> {
let bytes_needed = { let fs_bytes_needed = {
let stream = match db.streams_by_id().get(&stream_id) { let stream = match db.streams_by_id().get(&stream_id) {
None => bail!("no stream {}", stream_id), None => bail!("no stream {}", stream_id),
Some(s) => s, Some(s) => s,
}; };
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete + extra_bytes_needed
- stream.retain_bytes - stream.retain_bytes
}; };
let mut bytes_to_delete = 0; let mut fs_bytes_to_delete = 0;
if bytes_needed <= 0 { if fs_bytes_needed <= 0 {
debug!("{}: have remaining quota of {}", stream_id, -bytes_needed); debug!("{}: have remaining quota of {}", stream_id,
base::strutil::encode_size(-fs_bytes_needed));
return Ok(()); return Ok(());
} }
let mut n = 0; let mut n = 0;
db.delete_oldest_recordings(stream_id, &mut |row| { db.delete_oldest_recordings(stream_id, &mut |row| {
if bytes_needed >= bytes_to_delete { if fs_bytes_needed >= fs_bytes_to_delete {
bytes_to_delete += row.sample_file_bytes as i64; fs_bytes_to_delete += db::round_up(i64::from(row.sample_file_bytes));
n += 1; n += 1;
return true; return true;
} }
@ -996,7 +997,7 @@ mod tests {
h.db.lock().update_retention(&[db::RetentionChange { h.db.lock().update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID, stream_id: testutil::TEST_STREAM_ID,
new_record: true, new_record: true,
new_limit: 3, new_limit: 0,
}]).unwrap(); }]).unwrap();
// Setup: add a 3-byte recording. // Setup: add a 3-byte recording.
@ -1143,7 +1144,7 @@ mod tests {
h.db.lock().update_retention(&[db::RetentionChange { h.db.lock().update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID, stream_id: testutil::TEST_STREAM_ID,
new_record: true, new_record: true,
new_limit: 3, new_limit: 0,
}]).unwrap(); }]).unwrap();
// Setup: add a 3-byte recording. // Setup: add a 3-byte recording.

View File

@ -90,6 +90,10 @@ The `application/json` response will have a dict as follows:
be lesser if there are gaps in the recorded data. be lesser if there are gaps in the recorded data.
* `totalSampleFileBytes`: the total number of bytes of sample data * `totalSampleFileBytes`: the total number of bytes of sample data
(the `mdat` portion of a `.mp4` file). (the `mdat` portion of a `.mp4` file).
* `fsBytes`: the total number of bytes on the filesystem used by
this stream. This is slightly more than `totalSampleFileBytes`
because it also includes the wasted portion of the final
filesystem block allocated to each file.
* `days`: (only included if request pararameter `days` is true) * `days`: (only included if request pararameter `days` is true)
dictionary representing calendar days (in the server's time zone) dictionary representing calendar days (in the server's time zone)
with non-zero total duration of recordings for that day. The keys with non-zero total duration of recordings for that day. The keys

View File

@ -326,8 +326,8 @@ fn edit_camera_dialog(db: &Arc<db::Database>, siv: &mut Cursive, item: &Option<i
let u = if s.retain_bytes == 0 { let u = if s.retain_bytes == 0 {
"0 / 0 (0.0%)".to_owned() "0 / 0 (0.0%)".to_owned()
} else { } else {
format!("{} / {} ({:.1}%)", s.sample_file_bytes, s.retain_bytes, format!("{} / {} ({:.1}%)", s.fs_bytes, s.retain_bytes,
100. * s.sample_file_bytes as f32 / s.retain_bytes as f32) 100. * s.fs_bytes as f32 / s.retain_bytes as f32)
}; };
dialog.call_on_name(&format!("{}_rtsp_url", t.as_str()), dialog.call_on_name(&format!("{}_rtsp_url", t.as_str()),
|v: &mut views::EditView| v.set_content(s.rtsp_url.to_owned())); |v: &mut views::EditView| v.set_content(s.rtsp_url.to_owned()));

View File

@ -290,11 +290,11 @@ fn edit_dir_dialog(db: &Arc<db::Database>, siv: &mut Cursive, dir_id: i32) {
} }
streams.insert(id, Stream { streams.insert(id, Stream {
label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()), label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()),
used: s.sample_file_bytes, used: s.fs_bytes,
record: s.record, record: s.record,
retain: Some(s.retain_bytes), retain: Some(s.retain_bytes),
}); });
total_used += s.sample_file_bytes; total_used += s.fs_bytes;
total_retain += s.retain_bytes; total_retain += s.retain_bytes;
} }
if streams.is_empty() { if streams.is_empty() {

View File

@ -106,6 +106,7 @@ pub struct Stream<'a> {
pub max_end_time_90k: Option<i64>, pub max_end_time_90k: Option<i64>,
pub total_duration_90k: i64, pub total_duration_90k: i64,
pub total_sample_file_bytes: i64, pub total_sample_file_bytes: i64,
pub fs_bytes: i64,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "Stream::serialize_days")] #[serde(serialize_with = "Stream::serialize_days")]
@ -235,6 +236,7 @@ impl<'a> Stream<'a> {
max_end_time_90k: s.range.as_ref().map(|r| r.end.0), max_end_time_90k: s.range.as_ref().map(|r| r.end.0),
total_duration_90k: s.duration.0, total_duration_90k: s.duration.0,
total_sample_file_bytes: s.sample_file_bytes, total_sample_file_bytes: s.sample_file_bytes,
fs_bytes: s.fs_bytes,
days: if include_days { Some(&s.days) } else { None }, days: if include_days { Some(&s.days) } else { None },
})) }))
} }