properly account for bytes to add with next flush
This was considering them as 0, so it would under-delete until the next flush them delete all at once. That effectively doubled the number of bytes not yet deleted as they're first transferred to garbage, flushed again, then unlinked.
This commit is contained in:
parent
0f2e71ec4a
commit
b677964d1a
106
db/db.rs
106
db/db.rs
|
@ -372,9 +372,13 @@ pub struct Stream {
|
|||
/// On flush, delete the following recordings. Note they must be the oldest recordings.
|
||||
to_delete: Vec<ListOldestRecordingsRow>,
|
||||
|
||||
/// The total bytes to delete.
|
||||
/// The total bytes to delete with the next flush.
|
||||
pub bytes_to_delete: i64,
|
||||
|
||||
/// The total bytes to add with the next flush. (`mark_synced` has already been called on these
|
||||
/// recordings.)
|
||||
pub bytes_to_add: i64,
|
||||
|
||||
/// The total duration of recorded data. This may not be `range.end - range.start` due to
|
||||
/// gaps and overlap.
|
||||
pub duration: recording::Duration,
|
||||
|
@ -394,6 +398,9 @@ pub struct Stream {
|
|||
///
|
||||
/// TODO: alter the serving path to show these just as if they were already committed.
|
||||
uncommitted: VecDeque<Arc<Mutex<UncommittedRecording>>>,
|
||||
|
||||
/// The number of recordings in `uncommitted` which are synced and ready to commit.
|
||||
synced_recordings: usize,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
|
@ -402,11 +409,8 @@ impl Stream {
|
|||
/// before it are unsynced.
|
||||
pub(crate) fn unflushed(&self) -> recording::Duration {
|
||||
let mut dur = recording::Duration(0);
|
||||
for u in &self.uncommitted {
|
||||
let l = u.lock();
|
||||
if !l.synced {
|
||||
break;
|
||||
}
|
||||
for i in 0 .. self.synced_recordings {
|
||||
let l = self.uncommitted[i].lock();
|
||||
if let Some(ref r) = l.recording {
|
||||
dur += r.time.end - r.time.start;
|
||||
}
|
||||
|
@ -417,11 +421,7 @@ impl Stream {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct UncommittedRecording {
|
||||
/// If this recording has been synced to disk and thus is ready to commit to the database.
|
||||
/// `recording` should not be modified after this is set to true.
|
||||
pub(crate) synced: bool,
|
||||
|
||||
/// The recording to add. Absent iff the recording has been abandoned.
|
||||
/// The recording to add. Absent if not yet ready.
|
||||
/// TODO: modify `SampleIndexEncoder` to update this record as it goes.
|
||||
pub(crate) recording: Option<RecordingToInsert>,
|
||||
}
|
||||
|
@ -591,13 +591,6 @@ pub(crate) struct Open {
|
|||
pub(crate) uuid: Uuid,
|
||||
}
|
||||
|
||||
/// A modification to be done to a `Stream`, used within `LockedDatabase::flush`.
|
||||
#[derive(Default)]
|
||||
struct StreamModification {
|
||||
num_recordings_to_commit: i32,
|
||||
range: Option<Range<recording::Time>>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct CompositeId(pub i64);
|
||||
|
||||
|
@ -723,11 +716,13 @@ impl StreamStateChanger {
|
|||
sample_file_bytes: 0,
|
||||
to_delete: Vec::new(),
|
||||
bytes_to_delete: 0,
|
||||
bytes_to_add: 0,
|
||||
duration: recording::Duration(0),
|
||||
days: BTreeMap::new(),
|
||||
record: sc.record,
|
||||
next_recording_id: 1,
|
||||
uncommitted: VecDeque::new(),
|
||||
synced_recordings: 0,
|
||||
})));
|
||||
}
|
||||
}
|
||||
|
@ -767,6 +762,11 @@ impl LockedDatabase {
|
|||
&self.sample_file_dirs_by_id
|
||||
}
|
||||
|
||||
/// Adds a placeholder for an uncommitted recording.
|
||||
/// The caller should write and sync the file and populate the returned `UncommittedRecording`
|
||||
/// (noting that the database lock must not be acquired while holding the
|
||||
/// `UncommittedRecording`'s lock) then call `mark_synced`. The data will be written to the
|
||||
/// database on the next `flush`.
|
||||
pub(crate) fn add_recording(&mut self, stream_id: i32)
|
||||
-> Result<(CompositeId, Arc<Mutex<UncommittedRecording>>), Error> {
|
||||
let stream = match self.streams_by_id.get_mut(&stream_id) {
|
||||
|
@ -776,13 +776,37 @@ impl LockedDatabase {
|
|||
let id = CompositeId::new(stream_id,
|
||||
stream.next_recording_id + (stream.uncommitted.len() as i32));
|
||||
let recording = Arc::new(Mutex::new(UncommittedRecording {
|
||||
synced: false,
|
||||
recording: None,
|
||||
}));
|
||||
stream.uncommitted.push_back(Arc::clone(&recording));
|
||||
Ok((id, recording))
|
||||
}
|
||||
|
||||
/// Marks the given uncomitted recording as synced and ready to flush.
|
||||
/// This must be the next unsynced recording.
|
||||
pub(crate) fn mark_synced(&mut self, id: CompositeId) -> Result<(), Error> {
|
||||
let stream = match self.streams_by_id.get_mut(&id.stream()) {
|
||||
None => bail!("no stream for recording {}", id),
|
||||
Some(s) => s,
|
||||
};
|
||||
let next_unsynced = stream.next_recording_id + (stream.synced_recordings as i32);
|
||||
if id.recording() != next_unsynced {
|
||||
bail!("can't sync {} when next unsynced recording is {} (next unflushed is {})",
|
||||
id, next_unsynced, stream.next_recording_id);
|
||||
}
|
||||
if stream.synced_recordings == stream.uncommitted.len() {
|
||||
bail!("can't sync un-added recording {}", id);
|
||||
}
|
||||
let l = stream.uncommitted[stream.synced_recordings].lock();
|
||||
let r = match l.recording.as_ref() {
|
||||
None => bail!("can't sync unfinished recording {}", id),
|
||||
Some(r) => r,
|
||||
};
|
||||
stream.bytes_to_add += r.sample_file_bytes as i64;
|
||||
stream.synced_recordings += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec<CompositeId>)
|
||||
-> Result<(), Error> {
|
||||
let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) {
|
||||
|
@ -808,35 +832,29 @@ impl LockedDatabase {
|
|||
Some(o) => o,
|
||||
};
|
||||
let tx = self.conn.transaction()?;
|
||||
let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(),
|
||||
let mut new_ranges = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(),
|
||||
Default::default());
|
||||
{
|
||||
let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?;
|
||||
for (&stream_id, s) in &self.streams_by_id {
|
||||
// Process additions.
|
||||
let mut i = 0;
|
||||
for recording in &s.uncommitted {
|
||||
let l = recording.lock();
|
||||
if !l.synced { break; }
|
||||
if let Some(ref r) = l.recording {
|
||||
for i in 0..s.synced_recordings {
|
||||
let l = s.uncommitted[i].lock();
|
||||
let r = l.recording.as_ref().unwrap();
|
||||
raw::insert_recording(
|
||||
&tx, o, CompositeId::new(stream_id, s.next_recording_id + i), &r)?;
|
||||
&tx, o, CompositeId::new(stream_id, s.next_recording_id + i as i32), &r)?;
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
if i > 0 {
|
||||
let m = mods.entry(stream_id).or_insert_with(StreamModification::default);
|
||||
m.num_recordings_to_commit = i;
|
||||
if s.synced_recordings > 0 {
|
||||
new_ranges.entry(stream_id).or_insert(None);
|
||||
stmt.execute_named(&[
|
||||
(":stream_id", &stream_id),
|
||||
(":next_recording_id", &(s.next_recording_id + i)),
|
||||
(":next_recording_id", &(s.next_recording_id + s.synced_recordings as i32)),
|
||||
])?;
|
||||
}
|
||||
|
||||
// Process deletions.
|
||||
if let Some(l) = s.to_delete.last() {
|
||||
// Add a placeholder for recomputing the range.
|
||||
mods.entry(stream_id).or_insert_with(StreamModification::default);
|
||||
new_ranges.entry(stream_id).or_insert(None);
|
||||
let dir = match s.sample_file_dir_id {
|
||||
None => bail!("stream {} has no directory!", stream_id),
|
||||
Some(d) => d,
|
||||
|
@ -854,8 +872,8 @@ impl LockedDatabase {
|
|||
for dir in self.sample_file_dirs_by_id.values() {
|
||||
raw::mark_sample_files_deleted(&tx, &dir.to_gc)?;
|
||||
}
|
||||
for (&stream_id, m) in &mut mods {
|
||||
m.range = raw::get_range(&tx, stream_id)?;
|
||||
for (&stream_id, mut r) in &mut new_ranges {
|
||||
*r = raw::get_range(&tx, stream_id)?;
|
||||
}
|
||||
tx.commit()?;
|
||||
|
||||
|
@ -870,7 +888,7 @@ impl LockedDatabase {
|
|||
|
||||
let mut added = 0;
|
||||
let mut deleted = 0;
|
||||
for (stream_id, m) in mods.drain() {
|
||||
for (stream_id, new_range) in new_ranges.drain() {
|
||||
let s = self.streams_by_id.get_mut(&stream_id).unwrap();
|
||||
let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap();
|
||||
|
||||
|
@ -886,18 +904,20 @@ impl LockedDatabase {
|
|||
}
|
||||
|
||||
// Process add_recordings.
|
||||
s.next_recording_id += m.num_recordings_to_commit;
|
||||
added += m.num_recordings_to_commit;
|
||||
for _ in 0..m.num_recordings_to_commit {
|
||||
s.next_recording_id += s.synced_recordings as i32;
|
||||
added += s.synced_recordings;
|
||||
s.bytes_to_add = 0;
|
||||
for _ in 0..s.synced_recordings {
|
||||
let u = s.uncommitted.pop_front().unwrap();
|
||||
let l = u.lock();
|
||||
if let Some(ref r) = l.recording {
|
||||
s.add_recording(r.time.clone(), r.sample_file_bytes);
|
||||
}
|
||||
}
|
||||
s.synced_recordings = 0;
|
||||
|
||||
// Fix the range.
|
||||
s.range = m.range;
|
||||
s.range = new_range;
|
||||
}
|
||||
info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.",
|
||||
reason, added, deleted, gced);
|
||||
|
@ -1335,11 +1355,13 @@ impl LockedDatabase {
|
|||
sample_file_bytes: 0,
|
||||
to_delete: Vec::new(),
|
||||
bytes_to_delete: 0,
|
||||
bytes_to_add: 0,
|
||||
duration: recording::Duration(0),
|
||||
days: BTreeMap::new(),
|
||||
next_recording_id: row.get_checked(7)?,
|
||||
record: row.get_checked(8)?,
|
||||
uncommitted: VecDeque::new(),
|
||||
synced_recordings: 0,
|
||||
});
|
||||
c.streams[type_.index()] = Some(id);
|
||||
}
|
||||
|
@ -2065,7 +2087,7 @@ mod tests {
|
|||
let mut db = db.lock();
|
||||
let (id, u) = db.add_recording(main_stream_id).unwrap();
|
||||
u.lock().recording = Some(recording.clone());
|
||||
u.lock().synced = true;
|
||||
db.mark_synced(id).unwrap();
|
||||
db.flush("add test").unwrap();
|
||||
id
|
||||
};
|
||||
|
|
23
db/dir.rs
23
db/dir.rs
|
@ -295,7 +295,7 @@ impl SampleFileDir {
|
|||
|
||||
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
|
||||
enum SyncerCommand {
|
||||
AsyncSaveRecording(CompositeId, Arc<Mutex<db::UncommittedRecording>>, fs::File),
|
||||
AsyncSaveRecording(CompositeId, fs::File),
|
||||
DatabaseFlushed,
|
||||
Flush(mpsc::SyncSender<()>),
|
||||
}
|
||||
|
@ -365,14 +365,15 @@ pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
|
|||
{
|
||||
let stream = db.streams_by_id().get(&l.stream_id)
|
||||
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
|
||||
bytes_before = stream.sample_file_bytes - stream.bytes_to_delete;
|
||||
bytes_before = stream.sample_file_bytes + stream.bytes_to_add -
|
||||
stream.bytes_to_delete;
|
||||
extra = stream.retain_bytes - l.limit;
|
||||
}
|
||||
if l.limit >= bytes_before { continue }
|
||||
delete_recordings(db, l.stream_id, extra)?;
|
||||
let stream = db.streams_by_id().get(&l.stream_id).unwrap();
|
||||
info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before,
|
||||
stream.sample_file_bytes - stream.bytes_to_delete);
|
||||
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete);
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
|
@ -386,7 +387,7 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
|
|||
None => bail!("no stream {}", stream_id),
|
||||
Some(s) => s,
|
||||
};
|
||||
stream.sample_file_bytes - stream.bytes_to_delete + extra_bytes_needed
|
||||
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed
|
||||
- stream.retain_bytes
|
||||
};
|
||||
let mut bytes_to_delete = 0;
|
||||
|
@ -412,9 +413,8 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
|
|||
impl SyncerChannel {
|
||||
/// Asynchronously syncs the given writer, closes it, records it into the database, and
|
||||
/// starts rotation.
|
||||
fn async_save_recording(&self, id: CompositeId, recording: Arc<Mutex<db::UncommittedRecording>>,
|
||||
f: fs::File) {
|
||||
self.0.send(SyncerCommand::AsyncSaveRecording(id, recording, f)).unwrap();
|
||||
fn async_save_recording(&self, id: CompositeId, f: fs::File) {
|
||||
self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap();
|
||||
}
|
||||
|
||||
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
|
||||
|
@ -495,7 +495,7 @@ impl Syncer {
|
|||
loop {
|
||||
match cmds.recv() {
|
||||
Err(_) => return, // all senders have closed the channel; shutdown
|
||||
Ok(SyncerCommand::AsyncSaveRecording(id, r, f)) => self.save(id, r, f),
|
||||
Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f),
|
||||
Ok(SyncerCommand::DatabaseFlushed) => {
|
||||
retry_forever(&mut || self.collect_garbage(true))
|
||||
},
|
||||
|
@ -577,16 +577,15 @@ impl Syncer {
|
|||
/// so that there can be only one dir sync and database transaction per save.
|
||||
/// Internal helper for `save`. This is separated out so that the question-mark operator
|
||||
/// can be used in the many error paths.
|
||||
fn save(&mut self, id: CompositeId, recording: Arc<Mutex<db::UncommittedRecording>>,
|
||||
f: fs::File) {
|
||||
fn save(&mut self, id: CompositeId, f: fs::File) {
|
||||
let stream_id = id.stream();
|
||||
|
||||
// Free up a like number of bytes.
|
||||
retry_forever(&mut || delete_recordings(&mut self.db.lock(), stream_id, 0));
|
||||
retry_forever(&mut || f.sync_all());
|
||||
retry_forever(&mut || self.dir.sync());
|
||||
recording.lock().synced = true;
|
||||
let mut db = self.db.lock();
|
||||
db.mark_synced(id).unwrap();
|
||||
let reason = {
|
||||
let s = db.streams_by_id().get(&stream_id).unwrap();
|
||||
let c = db.cameras_by_id().get(&s.camera_id).unwrap();
|
||||
|
@ -880,7 +879,7 @@ impl InnerWriter {
|
|||
flags: flags,
|
||||
};
|
||||
self.r.lock().recording = Some(recording);
|
||||
channel.async_save_recording(self.id, self.r, self.f);
|
||||
channel.async_save_recording(self.id, self.f);
|
||||
PreviousWriter {
|
||||
end_time: end,
|
||||
local_time_delta: local_start_delta,
|
||||
|
|
|
@ -145,7 +145,7 @@ impl TestDb {
|
|||
run_offset: 0,
|
||||
flags: db::RecordingFlags::TrailingZero as i32,
|
||||
});
|
||||
u.lock().synced = true;
|
||||
db.mark_synced(id).unwrap();
|
||||
db.flush("create_recording_from_encoder").unwrap();
|
||||
let mut row = None;
|
||||
db.list_recordings_by_id(TEST_STREAM_ID, id.recording() .. id.recording()+1,
|
||||
|
@ -177,12 +177,12 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) {
|
|||
run_offset: 0,
|
||||
};
|
||||
for _ in 0..num {
|
||||
let (_, u) = db.add_recording(TEST_STREAM_ID).unwrap();
|
||||
let (id, u) = db.add_recording(TEST_STREAM_ID).unwrap();
|
||||
u.lock().recording = Some(recording.clone());
|
||||
u.lock().synced = true;
|
||||
recording.time.start += DURATION;
|
||||
recording.time.end += DURATION;
|
||||
recording.run_offset += 1;
|
||||
db.mark_synced(id).unwrap();
|
||||
}
|
||||
db.flush("add_dummy_recordings_to_db").unwrap();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue