mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-01-13 16:03:22 -05:00
Fix "no garbage row for <id>" flush failure loops
Add some comments along the way. Fixes #63.
This commit is contained in:
parent
496f7d7e3a
commit
131c5e0640
64
db/db.rs
64
db/db.rs
@ -272,8 +272,15 @@ pub struct SampleFileDir {
|
||||
pub uuid: Uuid,
|
||||
dir: Option<Arc<dir::SampleFileDir>>,
|
||||
last_complete_open: Option<Open>,
|
||||
to_gc: Vec<CompositeId>,
|
||||
pub(crate) garbage: FnvHashSet<CompositeId>,
|
||||
|
||||
/// ids which are in the `garbage` database table (rather than `recording`) as of last commit
|
||||
/// but may still exist on disk. These can't be safely removed from the database yet.
|
||||
pub(crate) garbage_needs_unlink: FnvHashSet<CompositeId>,
|
||||
|
||||
/// ids which are in the `garbage` database table and are guaranteed to no longer exist on
|
||||
/// disk (have been unlinked and the dir has been synced). These may be removed from the
|
||||
/// database on next flush. Mutually exclusive with `garbage_needs_unlink`.
|
||||
pub(crate) garbage_unlinked: Vec<CompositeId>,
|
||||
}
|
||||
|
||||
impl SampleFileDir {
|
||||
@ -372,7 +379,10 @@ pub struct Stream {
|
||||
pub range: Option<Range<recording::Time>>,
|
||||
pub sample_file_bytes: i64,
|
||||
|
||||
/// On flush, delete the following recordings. Note they must be the oldest recordings.
|
||||
/// On flush, delete the following recordings (move them to the `garbage` table, to be
|
||||
/// collected later). Note they must be the oldest recordings. The later collection involves
|
||||
/// the syncer unlinking the files on disk and syncing the directory then enqueueing for
|
||||
/// another following flush removal from the `garbage` table.
|
||||
to_delete: Vec<ListOldestRecordingsRow>,
|
||||
|
||||
/// The total bytes to delete with the next flush.
|
||||
@ -788,7 +798,17 @@ impl LockedDatabase {
|
||||
None => bail!("no such dir {}", dir_id),
|
||||
Some(d) => d,
|
||||
};
|
||||
dir.to_gc.append(ids);
|
||||
dir.garbage_unlinked.reserve(ids.len());
|
||||
ids.retain(|id| {
|
||||
if !dir.garbage_needs_unlink.remove(id) {
|
||||
return true;
|
||||
}
|
||||
dir.garbage_unlinked.push(*id);
|
||||
false
|
||||
});
|
||||
if !ids.is_empty() {
|
||||
bail!("delete_garbage with non-garbage ids {:?}", &ids[..]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -827,6 +847,11 @@ impl LockedDatabase {
|
||||
None => bail!("stream {} has no directory!", stream_id),
|
||||
Some(d) => d,
|
||||
};
|
||||
|
||||
// raw::delete_recordings does a bulk transfer of a range from recording to
|
||||
// garbage, rather than operating on each element of to_delete. This is
|
||||
// guaranteed to give the same result because to_delete is guaranteed to be the
|
||||
// oldest recordings for the stream.
|
||||
let start = CompositeId::new(stream_id, 0);
|
||||
let end = CompositeId(l.id.0 + 1);
|
||||
let n = raw::delete_recordings(&tx, dir, start .. end)? as usize;
|
||||
@ -838,7 +863,7 @@ impl LockedDatabase {
|
||||
}
|
||||
}
|
||||
for dir in self.sample_file_dirs_by_id.values() {
|
||||
raw::mark_sample_files_deleted(&tx, &dir.to_gc)?;
|
||||
raw::mark_sample_files_deleted(&tx, &dir.garbage_unlinked)?;
|
||||
}
|
||||
for (&stream_id, mut r) in &mut new_ranges {
|
||||
*r = raw::get_range(&tx, stream_id)?;
|
||||
@ -860,10 +885,8 @@ impl LockedDatabase {
|
||||
// Process delete_garbage.
|
||||
let mut gced = 0;
|
||||
for dir in self.sample_file_dirs_by_id.values_mut() {
|
||||
gced += dir.to_gc.len();
|
||||
for id in dir.to_gc.drain(..) {
|
||||
dir.garbage.remove(&id);
|
||||
}
|
||||
gced += dir.garbage_unlinked.len();
|
||||
dir.garbage_unlinked.clear();
|
||||
}
|
||||
|
||||
let mut added = 0;
|
||||
@ -877,7 +900,7 @@ impl LockedDatabase {
|
||||
s.sample_file_bytes -= s.bytes_to_delete;
|
||||
s.bytes_to_delete = 0;
|
||||
for row in s.to_delete.drain(..) {
|
||||
d.garbage.insert(row.id);
|
||||
d.garbage_needs_unlink.insert(row.id);
|
||||
let d = recording::Duration(row.duration as i64);
|
||||
s.duration -= d;
|
||||
adjust_days(row.start .. row.start + d, -1, &mut s.days);
|
||||
@ -1284,8 +1307,8 @@ impl LockedDatabase {
|
||||
path: row.get_checked(1)?,
|
||||
dir: None,
|
||||
last_complete_open,
|
||||
to_gc: Vec::new(),
|
||||
garbage: raw::list_garbage(&self.conn, id)?,
|
||||
garbage_needs_unlink: raw::list_garbage(&self.conn, id)?,
|
||||
garbage_unlinked: Vec::new(),
|
||||
});
|
||||
}
|
||||
info!("Loaded {} sample file dirs", self.sample_file_dirs_by_id.len());
|
||||
@ -1463,8 +1486,8 @@ impl LockedDatabase {
|
||||
uuid,
|
||||
dir: Some(dir),
|
||||
last_complete_open: None,
|
||||
to_gc: Vec::new(),
|
||||
garbage: FnvHashSet::default(),
|
||||
garbage_needs_unlink: FnvHashSet::default(),
|
||||
garbage_unlinked: Vec::new(),
|
||||
}),
|
||||
Entry::Occupied(_) => Err(format_err!("duplicate sample file dir id {}", id))?,
|
||||
};
|
||||
@ -1484,7 +1507,7 @@ impl LockedDatabase {
|
||||
::std::collections::btree_map::Entry::Occupied(e) => e,
|
||||
_ => bail!("no such dir {} to remove", dir_id),
|
||||
};
|
||||
if !d.get().garbage.is_empty() {
|
||||
if !d.get().garbage_needs_unlink.is_empty() || !d.get().garbage_unlinked.is_empty() {
|
||||
bail!("must collect garbage before deleting directory {}", d.get().path);
|
||||
}
|
||||
let dir = match d.get_mut().dir.take() {
|
||||
@ -2188,10 +2211,19 @@ mod tests {
|
||||
.sample_file_dirs_by_id()
|
||||
.get(&sample_file_dir_id)
|
||||
.unwrap()
|
||||
.garbage
|
||||
.garbage_needs_unlink
|
||||
.iter()
|
||||
.map(|&id| id)
|
||||
.collect();
|
||||
assert_eq!(&g, &[id]);
|
||||
let g: Vec<_> = db.lock()
|
||||
.sample_file_dirs_by_id()
|
||||
.get(&sample_file_dir_id)
|
||||
.unwrap()
|
||||
.garbage_unlinked
|
||||
.iter()
|
||||
.map(|&id| id)
|
||||
.collect();
|
||||
assert_eq!(&g, &[]);
|
||||
}
|
||||
}
|
||||
|
@ -294,7 +294,13 @@ pub(crate) fn mark_sample_files_deleted(tx: &rusqlite::Transaction, ids: &[Compo
|
||||
for &id in ids {
|
||||
let changes = stmt.execute(&[&id.0])?;
|
||||
if changes != 1 {
|
||||
bail!("no garbage row for {}", id);
|
||||
// panic rather than return error. Errors get retried indefinitely, but there's no
|
||||
// recovery from this condition.
|
||||
//
|
||||
// Tempting to just consider logging error and moving on, but this represents a logic
|
||||
// flaw, so complain loudly. The freshly deleted file might still be referenced in the
|
||||
// recording table.
|
||||
panic!("no garbage row for {}", id);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
95
db/writer.rs
95
db/writer.rs
@ -316,7 +316,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
|
||||
let mut garbage: Vec<_> = {
|
||||
let l = self.db.lock();
|
||||
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
|
||||
d.garbage.iter().map(|id| *id).collect()
|
||||
d.garbage_needs_unlink.iter().map(|id| *id).collect()
|
||||
};
|
||||
if !garbage.is_empty() {
|
||||
// Try to delete files; retain ones in `garbage` that don't exist.
|
||||
@ -352,8 +352,8 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
Ok(cmd) => cmd,
|
||||
},
|
||||
Some((t, r, flushes)) => {
|
||||
// Note: `flushes` will be dropped on exit from this block, which has the desired
|
||||
// behavior of closing the channel.
|
||||
// Note: `flushes` will be dropped on exit from this block, which has the
|
||||
// desired behavior of closing the channel.
|
||||
|
||||
let now = self.db.clocks().monotonic();
|
||||
|
||||
@ -393,7 +393,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
let mut garbage: Vec<_> = {
|
||||
let l = self.db.lock();
|
||||
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
|
||||
d.garbage.iter().map(|id| *id).collect()
|
||||
d.garbage_needs_unlink.iter().map(|id| *id).collect()
|
||||
};
|
||||
if garbage.is_empty() {
|
||||
return;
|
||||
@ -882,6 +882,87 @@ mod tests {
|
||||
|
||||
fn eio() -> io::Error { io::Error::new(io::ErrorKind::Other, "got EIO") }
|
||||
|
||||
/// Tests the database flushing while a syncer is still processing a previous flush event.
|
||||
#[test]
|
||||
fn double_flush() {
|
||||
testutil::init();
|
||||
let h = new_harness();
|
||||
h.db.lock().update_retention(&[db::RetentionChange {
|
||||
stream_id: testutil::TEST_STREAM_ID,
|
||||
new_record: true,
|
||||
new_limit: 3,
|
||||
}]).unwrap();
|
||||
|
||||
// Setup: add a 3-byte recording.
|
||||
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
|
||||
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
|
||||
{
|
||||
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
||||
video_sample_entry_id);
|
||||
let f = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
||||
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"123", recording::Time(2), 0, true).unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
w.close(Some(1));
|
||||
h.channel.flush();
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
|
||||
// Then a 1-byte recording.
|
||||
let f = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
|
||||
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
||||
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"4", recording::Time(3), 1, true).unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
|
||||
let db = h.db.clone();
|
||||
move |_| {
|
||||
// The drop(w) below should cause the old recording to be deleted (moved to
|
||||
// garbage). When the database is flushed, the syncer forces garbage collection
|
||||
// including this unlink.
|
||||
|
||||
// Do another database flush here, as if from another syncer.
|
||||
db.lock().flush("another syncer running").unwrap();
|
||||
Ok(())
|
||||
}
|
||||
})));
|
||||
let (gc_done_snd, gc_done_rcv) = mpsc::channel();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(move || {
|
||||
gc_done_snd.send(()).unwrap();
|
||||
Ok(())
|
||||
})));
|
||||
//h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
|
||||
//h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
|
||||
drop(w);
|
||||
|
||||
gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call...
|
||||
h.channel.flush(); // ...and the DatabaseFlush op to complete.
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
}
|
||||
|
||||
// Garbage should be marked collected on the next flush.
|
||||
{
|
||||
let mut l = h.db.lock();
|
||||
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
|
||||
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
|
||||
l.flush("forced gc").unwrap();
|
||||
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
|
||||
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
|
||||
}
|
||||
|
||||
// The syncer should shut down cleanly.
|
||||
drop(h.channel);
|
||||
h.db.lock().clear_on_flush();
|
||||
h.join.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn write_path_retries() {
|
||||
testutil::init();
|
||||
@ -1005,9 +1086,11 @@ mod tests {
|
||||
// Garbage should be marked collected on the next flush.
|
||||
{
|
||||
let mut l = h.db.lock();
|
||||
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty());
|
||||
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
|
||||
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
|
||||
l.flush("forced gc").unwrap();
|
||||
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty());
|
||||
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
|
||||
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
|
||||
}
|
||||
|
||||
// The syncer should shut down cleanly.
|
||||
|
Loading…
Reference in New Issue
Block a user