Merge branch 'master' into new-schema

This commit is contained in:
Scott Lamb
2020-07-12 19:22:38 -07:00
10 changed files with 101 additions and 49 deletions

View File

@@ -111,6 +111,20 @@ const UPDATE_STREAM_COUNTERS_SQL: &'static str = r#"
where id = :stream_id
"#;
/// The size of a filesystem block, to use in disk space accounting.
/// This should really be obtained by a stat call on the sample file directory in question,
/// but that requires some refactoring. See
/// [#89](https://github.com/scottlamb/moonfire-nvr/issues/89). We might be able to get away with
/// this hardcoded value for a while.
const ASSUMED_BLOCK_SIZE_BYTES: i64 = 4096;
/// Rounds a file size up to the next multiple of the block size.
/// This is useful in representing the actual amount of filesystem space used.
pub(crate) fn round_up(bytes: i64) -> i64 {
let blk = ASSUMED_BLOCK_SIZE_BYTES;
(bytes + blk - 1) / blk * blk
}
pub struct FromSqlUuid(pub Uuid);
impl rusqlite::types::FromSql for FromSqlUuid {
@@ -194,8 +208,7 @@ pub struct ListAggregatedRecordingsRow {
pub growing: bool,
}
impl ListAggregatedRecordingsRow {
fn from(row: ListRecordingsRow) -> Self {
impl ListAggregatedRecordingsRow { fn from(row: ListRecordingsRow) -> Self {
let recording_id = row.id.recording();
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
@@ -440,8 +453,15 @@ pub struct Stream {
/// The time range of recorded data associated with this stream (minimum start time and maximum
/// end time). `None` iff there are no recordings for this camera.
pub range: Option<Range<recording::Time>>,
/// The total bytes of flushed sample files. This doesn't include disk space wasted in the
/// last filesystem block allocated to each file ("internal fragmentation").
pub sample_file_bytes: i64,
/// The total bytes on the filesystem used by this stream. This slightly more than
/// `sample_file_bytes` because it includes the wasted space in the last filesystem block.
pub fs_bytes: i64,
/// On flush, delete the following recordings (move them to the `garbage` table, to be
/// collected later). Note they must be the oldest recordings. The later collection involves
/// the syncer unlinking the files on disk and syncing the directory then enqueueing for
@@ -450,10 +470,12 @@ pub struct Stream {
/// The total bytes to delete with the next flush.
pub bytes_to_delete: i64,
pub fs_bytes_to_delete: i64,
/// The total bytes to add with the next flush. (`mark_synced` has already been called on these
/// recordings.)
pub bytes_to_add: i64,
pub fs_bytes_to_add: i64,
/// The total duration of undeleted recorded data. This may not be `range.end - range.start`
/// due to gaps and overlap.
@@ -605,6 +627,7 @@ impl Stream {
});
self.duration += r.end - r.start;
self.sample_file_bytes += sample_file_bytes as i64;
self.fs_bytes += round_up(i64::from(sample_file_bytes));
adjust_days(r, 1, &mut self.days);
}
}
@@ -801,9 +824,12 @@ impl StreamStateChanger {
flush_if_sec: sc.flush_if_sec,
range: None,
sample_file_bytes: 0,
fs_bytes: 0,
to_delete: Vec::new(),
bytes_to_delete: 0,
fs_bytes_to_delete: 0,
bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
record: sc.record,
@@ -898,7 +924,9 @@ impl LockedDatabase {
bail!("can't sync un-added recording {}", id);
}
let l = stream.uncommitted[stream.synced_recordings].lock();
stream.bytes_to_add += l.sample_file_bytes as i64;
let bytes = i64::from(l.sample_file_bytes);
stream.bytes_to_add += bytes;
stream.fs_bytes_to_add += round_up(bytes);
stream.synced_recordings += 1;
Ok(())
}
@@ -1055,17 +1083,19 @@ impl LockedDatabase {
for (stream_id, new_range) in new_ranges.drain() {
let s = self.streams_by_id.get_mut(&stream_id).unwrap();
let dir_id = s.sample_file_dir_id.unwrap();
let d = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap();
let dir = self.sample_file_dirs_by_id.get_mut(&dir_id).unwrap();
let log = dir_logs.entry(dir_id).or_default();
// Process delete_oldest_recordings.
s.sample_file_bytes -= s.bytes_to_delete;
s.fs_bytes -= s.fs_bytes_to_delete;
log.deleted_bytes += s.bytes_to_delete;
s.bytes_to_delete = 0;
s.fs_bytes_to_delete = 0;
log.deleted.reserve(s.to_delete.len());
for row in s.to_delete.drain(..) {
log.deleted.push(row.id);
d.garbage_needs_unlink.insert(row.id);
dir.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(row.duration as i64);
s.duration -= d;
adjust_days(row.start .. row.start + d, -1, &mut s.days);
@@ -1074,6 +1104,7 @@ impl LockedDatabase {
// Process add_recordings.
log.added_bytes += s.bytes_to_add;
s.bytes_to_add = 0;
s.fs_bytes_to_add = 0;
log.added.reserve(s.synced_recordings);
for _ in 0..s.synced_recordings {
let u = s.uncommitted.pop_front().unwrap();
@@ -1382,7 +1413,7 @@ impl LockedDatabase {
Err(format_err!("no such recording {}", id))
}
/// Deletes the oldest recordings that aren't already queued for deletion.
/// Queues for deletion the oldest recordings that aren't already queued.
/// `f` should return true for each row that should be deleted.
pub(crate) fn delete_oldest_recordings(
&mut self, stream_id: i32, f: &mut dyn FnMut(&ListOldestRecordingsRow) -> bool)
@@ -1398,7 +1429,9 @@ impl LockedDatabase {
raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| {
if f(&r) {
s.to_delete.push(r);
s.bytes_to_delete += r.sample_file_bytes as i64;
let bytes = i64::from(r.sample_file_bytes);
s.bytes_to_delete += bytes;
s.fs_bytes_to_delete += round_up(bytes);
return true;
}
false
@@ -1558,9 +1591,12 @@ impl LockedDatabase {
flush_if_sec,
range: None,
sample_file_bytes: 0,
fs_bytes: 0,
to_delete: Vec::new(),
bytes_to_delete: 0,
fs_bytes_to_delete: 0,
bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
cum_recordings: row.get(7)?,
@@ -1910,13 +1946,25 @@ impl LockedDatabase {
}
}
/// Sets pragmas for full database integrity.
pub(crate) fn set_integrity_pragmas(conn: &mut rusqlite::Connection) -> Result<(), Error> {
// Enforce foreign keys. This is on by default with --features=bundled (as rusqlite
// compiles the SQLite3 amalgamation with -DSQLITE_DEFAULT_FOREIGN_KEYS=1). Ensure it's
// always on. Note that our foreign keys are immediate rather than deferred, so we have to
// be careful about the order of operations during the upgrade.
conn.execute("pragma foreign_keys = on", params![])?;
// Make the database actually durable.
conn.execute("pragma fullfsync = on", params![])?;
conn.execute("pragma synchronous = 3", params![])?;
Ok(())
}
/// Initializes a database.
/// Note this doesn't set journal options, so that it can be used on in-memory databases for
/// test code.
pub fn init(conn: &mut rusqlite::Connection) -> Result<(), Error> {
conn.execute("pragma foreign_keys = on", params![])?;
conn.execute("pragma fullfsync = on", params![])?;
conn.execute("pragma synchronous = 2", params![])?;
set_integrity_pragmas(conn)?;
let tx = conn.transaction()?;
tx.execute_batch(include_str!("schema.sql"))?;
{
@@ -1975,11 +2023,9 @@ fn operation() -> &'static str { "database operation" }
impl<C: Clocks + Clone> Database<C> {
/// Creates the database from a caller-supplied SQLite connection.
pub fn new(clocks: C, conn: rusqlite::Connection,
pub fn new(clocks: C, mut conn: rusqlite::Connection,
read_write: bool) -> Result<Database<C>, Error> {
conn.execute("pragma foreign_keys = on", params![])?;
conn.execute("pragma fullfsync = on", params![])?;
conn.execute("pragma synchronous = 2", params![])?;
set_integrity_pragmas(&mut conn)?;
{
let ver = get_schema_version(&conn)?.ok_or_else(|| format_err!(
"no such table: version. \
@@ -2466,4 +2512,12 @@ mod tests {
.collect();
assert_eq!(&g, &[]);
}
#[test]
fn round_up() {
assert_eq!(super::round_up(0), 0);
assert_eq!(super::round_up(8_191), 8_192);
assert_eq!(super::round_up(8_192), 8_192);
assert_eq!(super::round_up(8_193), 12_288);
}
}

View File

@@ -105,16 +105,7 @@ fn upgrade(args: &Args, target_ver: i32, conn: &mut rusqlite::Connection) -> Res
}
pub fn run(args: &Args, conn: &mut rusqlite::Connection) -> Result<(), Error> {
// Enforce foreign keys. This is on by default with --features=bundled (as rusqlite
// compiles the SQLite3 amalgamation with -DSQLITE_DEFAULT_FOREIGN_KEYS=1). Ensure it's
// always on. Note that our foreign keys are immediate rather than deferred, so we have to
// be careful about the order of operations during the upgrade.
conn.execute("pragma foreign_keys = on", params![])?;
// Make the database actually durable.
conn.execute("pragma fullfsync = on", params![])?;
conn.execute("pragma synchronous = 2", params![])?;
db::set_integrity_pragmas(conn)?;
upgrade(args, db::EXPECTED_VERSION, conn)?;
// WAL is the preferred journal mode for normal operation; it reduces the number of syncs

View File

@@ -193,15 +193,15 @@ pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
syncer.do_rotation(|db| {
for l in limits {
let (bytes_before, extra);
let (fs_bytes_before, extra);
{
let stream = db.streams_by_id().get(&l.stream_id)
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
bytes_before = stream.sample_file_bytes + stream.bytes_to_add -
stream.bytes_to_delete;
fs_bytes_before = stream.fs_bytes + stream.fs_bytes_to_add -
stream.fs_bytes_to_delete;
extra = stream.retain_bytes - l.limit;
}
if l.limit >= bytes_before { continue }
if l.limit >= fs_bytes_before { continue }
delete_recordings(db, l.stream_id, extra)?;
}
Ok(())
@@ -211,23 +211,24 @@ pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
/// Deletes recordings to bring a stream's disk usage within bounds.
fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
extra_bytes_needed: i64) -> Result<(), Error> {
let bytes_needed = {
let fs_bytes_needed = {
let stream = match db.streams_by_id().get(&stream_id) {
None => bail!("no stream {}", stream_id),
Some(s) => s,
};
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed
stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete + extra_bytes_needed
- stream.retain_bytes
};
let mut bytes_to_delete = 0;
if bytes_needed <= 0 {
debug!("{}: have remaining quota of {}", stream_id, -bytes_needed);
let mut fs_bytes_to_delete = 0;
if fs_bytes_needed <= 0 {
debug!("{}: have remaining quota of {}", stream_id,
base::strutil::encode_size(-fs_bytes_needed));
return Ok(());
}
let mut n = 0;
db.delete_oldest_recordings(stream_id, &mut |row| {
if bytes_needed >= bytes_to_delete {
bytes_to_delete += i64::from(row.sample_file_bytes);
if fs_bytes_needed >= fs_bytes_to_delete {
fs_bytes_to_delete += db::round_up(i64::from(row.sample_file_bytes));
n += 1;
return true;
}
@@ -993,7 +994,7 @@ mod tests {
h.db.lock().update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID,
new_record: true,
new_limit: 3,
new_limit: 0,
}]).unwrap();
// Setup: add a 3-byte recording.
@@ -1152,7 +1153,7 @@ mod tests {
h.db.lock().update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID,
new_record: true,
new_limit: 3,
new_limit: 0,
}]).unwrap();
// Setup: add a 3-byte recording.