From 959defebcab6b83dbb4b6a5ed61807cea93889c2 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Sun, 12 Jul 2020 16:51:39 -0700 Subject: [PATCH] track "assumed" filesystem usage (#89) As described in #89, we need to refactor a bit before we can get the actual filesystem block size. Assuming 4096 for now. Small steps. --- base/strutil.rs | 2 +- db/db.rs | 58 +++++++++++++++++++++++++++++++++----- db/writer.rs | 27 +++++++++--------- design/api.md | 4 +++ src/cmds/config/cameras.rs | 4 +-- src/cmds/config/dirs.rs | 4 +-- src/json.rs | 2 ++ 7 files changed, 76 insertions(+), 25 deletions(-) diff --git a/base/strutil.rs b/base/strutil.rs index 24ddb90..d1d57e2 100644 --- a/base/strutil.rs +++ b/base/strutil.rs @@ -44,7 +44,7 @@ static MULTIPLIERS: [(char, u64); 4] = [ ('K', 10), ]; -/// Encodes a size into human-readable form. +/// Encodes a non-negative size into human-readable form. pub fn encode_size(mut raw: i64) -> String { let mut encoded = String::new(); for &(c, n) in &MULTIPLIERS { diff --git a/db/db.rs b/db/db.rs index aca3712..c95313a 100644 --- a/db/db.rs +++ b/db/db.rs @@ -104,6 +104,20 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = "update stream set next_recording_id = :next_recording_id where id = :stream_id"; +/// The size of a filesystem block, to use in disk space accounting. +/// This should really be obtained by a stat call on the sample file directory in question, +/// but that requires some refactoring. See +/// [#89](https://github.com/scottlamb/moonfire-nvr/issues/89). We might be able to get away with +/// this hardcoded value for a while. +const ASSUMED_BLOCK_SIZE_BYTES: i64 = 4096; + +/// Rounds a file size up to the next multiple of the block size. +/// This is useful in representing the actual amount of filesystem space used. +pub(crate) fn round_up(bytes: i64) -> i64 { + let blk = ASSUMED_BLOCK_SIZE_BYTES; + (bytes + blk - 1) / blk * blk +} + pub struct FromSqlUuid(pub Uuid); impl rusqlite::types::FromSql for FromSqlUuid { @@ -168,8 +182,7 @@ pub struct ListAggregatedRecordingsRow { pub growing: bool, } -impl ListAggregatedRecordingsRow { - fn from(row: ListRecordingsRow) -> Self { +impl ListAggregatedRecordingsRow { fn from(row: ListRecordingsRow) -> Self { let recording_id = row.id.recording(); let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0; let growing = (row.flags & RecordingFlags::Growing as i32) != 0; @@ -406,8 +419,15 @@ pub struct Stream { /// The time range of recorded data associated with this stream (minimum start time and maximum /// end time). `None` iff there are no recordings for this camera. pub range: Option>, + + /// The total bytes of flushed sample files. This doesn't include disk space wasted in the + /// last filesystem block allocated to each file ("internal fragmentation"). pub sample_file_bytes: i64, + /// The total bytes on the filesystem used by this stream. This slightly more than + /// `sample_file_bytes` because it includes the wasted space in the last filesystem block. + pub fs_bytes: i64, + /// On flush, delete the following recordings (move them to the `garbage` table, to be /// collected later). Note they must be the oldest recordings. The later collection involves /// the syncer unlinking the files on disk and syncing the directory then enqueueing for @@ -416,10 +436,12 @@ pub struct Stream { /// The total bytes to delete with the next flush. pub bytes_to_delete: i64, + pub fs_bytes_to_delete: i64, /// The total bytes to add with the next flush. (`mark_synced` has already been called on these /// recordings.) pub bytes_to_add: i64, + pub fs_bytes_to_add: i64, /// The total duration of recorded data. This may not be `range.end - range.start` due to /// gaps and overlap. @@ -565,6 +587,7 @@ impl Stream { }); self.duration += r.end - r.start; self.sample_file_bytes += sample_file_bytes as i64; + self.fs_bytes += round_up(i64::from(sample_file_bytes)); adjust_days(r, 1, &mut self.days); } } @@ -759,9 +782,12 @@ impl StreamStateChanger { flush_if_sec: sc.flush_if_sec, range: None, sample_file_bytes: 0, + fs_bytes: 0, to_delete: Vec::new(), bytes_to_delete: 0, + fs_bytes_to_delete: 0, bytes_to_add: 0, + fs_bytes_to_add: 0, duration: recording::Duration(0), days: BTreeMap::new(), record: sc.record, @@ -837,7 +863,9 @@ impl LockedDatabase { bail!("can't sync un-added recording {}", id); } let l = stream.uncommitted[stream.synced_recordings].lock(); - stream.bytes_to_add += l.sample_file_bytes as i64; + let bytes = i64::from(l.sample_file_bytes); + stream.bytes_to_add += bytes; + stream.fs_bytes_to_add += round_up(bytes); stream.synced_recordings += 1; Ok(()) } @@ -988,17 +1016,19 @@ impl LockedDatabase { for (stream_id, new_range) in new_ranges.drain() { let s = self.streams_by_id.get_mut(&stream_id).unwrap(); let dir_id = s.sample_file_dir_id.unwrap(); - let d = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap(); + let dir = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap(); let log = dir_logs.entry(dir_id).or_default(); // Process delete_oldest_recordings. s.sample_file_bytes -= s.bytes_to_delete; + s.fs_bytes -= s.fs_bytes_to_delete; log.deleted_bytes += s.bytes_to_delete; s.bytes_to_delete = 0; + s.fs_bytes_to_delete = 0; log.deleted.reserve(s.to_delete.len()); for row in s.to_delete.drain(..) { log.deleted.push(row.id); - d.garbage_needs_unlink.insert(row.id); + dir.garbage_needs_unlink.insert(row.id); let d = recording::Duration(row.duration as i64); s.duration -= d; adjust_days(row.start .. row.start + d, -1, &mut s.days); @@ -1007,6 +1037,7 @@ impl LockedDatabase { // Process add_recordings. log.added_bytes += s.bytes_to_add; s.bytes_to_add = 0; + s.fs_bytes_to_add = 0; log.added.reserve(s.synced_recordings); for _ in 0..s.synced_recordings { let u = s.uncommitted.pop_front().unwrap(); @@ -1312,7 +1343,7 @@ impl LockedDatabase { Err(format_err!("no such recording {}", id)) } - /// Deletes the oldest recordings that aren't already queued for deletion. + /// Queues for deletion the oldest recordings that aren't already queued. /// `f` should return true for each row that should be deleted. pub(crate) fn delete_oldest_recordings( &mut self, stream_id: i32, f: &mut dyn FnMut(&ListOldestRecordingsRow) -> bool) @@ -1328,7 +1359,9 @@ impl LockedDatabase { raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| { if f(&r) { s.to_delete.push(r); - s.bytes_to_delete += r.sample_file_bytes as i64; + let bytes = i64::from(r.sample_file_bytes); + s.bytes_to_delete += bytes; + s.fs_bytes_to_delete += round_up(bytes); return true; } false @@ -1490,9 +1523,12 @@ impl LockedDatabase { flush_if_sec, range: None, sample_file_bytes: 0, + fs_bytes: 0, to_delete: Vec::new(), bytes_to_delete: 0, + fs_bytes_to_delete: 0, bytes_to_add: 0, + fs_bytes_to_add: 0, duration: recording::Duration(0), days: BTreeMap::new(), next_recording_id: row.get(7)?, @@ -2398,4 +2434,12 @@ mod tests { .collect(); assert_eq!(&g, &[]); } + + #[test] + fn round_up() { + assert_eq!(super::round_up(0), 0); + assert_eq!(super::round_up(8_191), 8_192); + assert_eq!(super::round_up(8_192), 8_192); + assert_eq!(super::round_up(8_193), 12_288); + } } diff --git a/db/writer.rs b/db/writer.rs index 7a235d0..8fa331f 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -195,15 +195,15 @@ pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.do_rotation(|db| { for l in limits { - let (bytes_before, extra); + let (fs_bytes_before, extra); { let stream = db.streams_by_id().get(&l.stream_id) .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; - bytes_before = stream.sample_file_bytes + stream.bytes_to_add - - stream.bytes_to_delete; + fs_bytes_before = stream.fs_bytes + stream.fs_bytes_to_add - + stream.fs_bytes_to_delete; extra = stream.retain_bytes - l.limit; } - if l.limit >= bytes_before { continue } + if l.limit >= fs_bytes_before { continue } delete_recordings(db, l.stream_id, extra)?; } Ok(()) @@ -213,23 +213,24 @@ pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) /// Deletes recordings to bring a stream's disk usage within bounds. fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, extra_bytes_needed: i64) -> Result<(), Error> { - let bytes_needed = { + let fs_bytes_needed = { let stream = match db.streams_by_id().get(&stream_id) { None => bail!("no stream {}", stream_id), Some(s) => s, }; - stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed + stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete + extra_bytes_needed - stream.retain_bytes }; - let mut bytes_to_delete = 0; - if bytes_needed <= 0 { - debug!("{}: have remaining quota of {}", stream_id, -bytes_needed); + let mut fs_bytes_to_delete = 0; + if fs_bytes_needed <= 0 { + debug!("{}: have remaining quota of {}", stream_id, + base::strutil::encode_size(-fs_bytes_needed)); return Ok(()); } let mut n = 0; db.delete_oldest_recordings(stream_id, &mut |row| { - if bytes_needed >= bytes_to_delete { - bytes_to_delete += row.sample_file_bytes as i64; + if fs_bytes_needed >= fs_bytes_to_delete { + fs_bytes_to_delete += db::round_up(i64::from(row.sample_file_bytes)); n += 1; return true; } @@ -996,7 +997,7 @@ mod tests { h.db.lock().update_retention(&[db::RetentionChange { stream_id: testutil::TEST_STREAM_ID, new_record: true, - new_limit: 3, + new_limit: 0, }]).unwrap(); // Setup: add a 3-byte recording. @@ -1143,7 +1144,7 @@ mod tests { h.db.lock().update_retention(&[db::RetentionChange { stream_id: testutil::TEST_STREAM_ID, new_record: true, - new_limit: 3, + new_limit: 0, }]).unwrap(); // Setup: add a 3-byte recording. diff --git a/design/api.md b/design/api.md index 2bcb2c0..92216d5 100644 --- a/design/api.md +++ b/design/api.md @@ -90,6 +90,10 @@ The `application/json` response will have a dict as follows: be lesser if there are gaps in the recorded data. * `totalSampleFileBytes`: the total number of bytes of sample data (the `mdat` portion of a `.mp4` file). + * `fsBytes`: the total number of bytes on the filesystem used by + this stream. This is slightly more than `totalSampleFileBytes` + because it also includes the wasted portion of the final + filesystem block allocated to each file. * `days`: (only included if request pararameter `days` is true) dictionary representing calendar days (in the server's time zone) with non-zero total duration of recordings for that day. The keys diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index 758ab4a..7cbe03c 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -326,8 +326,8 @@ fn edit_camera_dialog(db: &Arc, siv: &mut Cursive, item: &Option, siv: &mut Cursive, dir_id: i32) { } streams.insert(id, Stream { label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()), - used: s.sample_file_bytes, + used: s.fs_bytes, record: s.record, retain: Some(s.retain_bytes), }); - total_used += s.sample_file_bytes; + total_used += s.fs_bytes; total_retain += s.retain_bytes; } if streams.is_empty() { diff --git a/src/json.rs b/src/json.rs index b7b3eff..47838c4 100644 --- a/src/json.rs +++ b/src/json.rs @@ -106,6 +106,7 @@ pub struct Stream<'a> { pub max_end_time_90k: Option, pub total_duration_90k: i64, pub total_sample_file_bytes: i64, + pub fs_bytes: i64, #[serde(skip_serializing_if = "Option::is_none")] #[serde(serialize_with = "Stream::serialize_days")] @@ -235,6 +236,7 @@ impl<'a> Stream<'a> { max_end_time_90k: s.range.as_ref().map(|r| r.end.0), total_duration_90k: s.duration.0, total_sample_file_bytes: s.sample_file_bytes, + fs_bytes: s.fs_bytes, days: if include_days { Some(&s.days) } else { None }, })) }