diff --git a/db/db.rs b/db/db.rs index 91af08e..581db7c 100644 --- a/db/db.rs +++ b/db/db.rs @@ -273,8 +273,15 @@ pub struct SampleFileDir { pub uuid: Uuid, dir: Option>, last_complete_open: Option, - to_gc: Vec, - pub(crate) garbage: FnvHashSet, + + /// ids which are in the `garbage` database table (rather than `recording`) as of last commit + /// but may still exist on disk. These can't be safely removed from the database yet. + pub(crate) garbage_needs_unlink: FnvHashSet, + + /// ids which are in the `garbage` database table and are guaranteed to no longer exist on + /// disk (have been unlinked and the dir has been synced). These may be removed from the + /// database on next flush. Mutually exclusive with `garbage_needs_unlink`. + pub(crate) garbage_unlinked: Vec, } impl SampleFileDir { @@ -379,7 +386,10 @@ pub struct Stream { pub range: Option>, pub sample_file_bytes: i64, - /// On flush, delete the following recordings. Note they must be the oldest recordings. + /// On flush, delete the following recordings (move them to the `garbage` table, to be + /// collected later). Note they must be the oldest recordings. The later collection involves + /// the syncer unlinking the files on disk and syncing the directory then enqueueing for + /// another following flush removal from the `garbage` table. to_delete: Vec, /// The total bytes to delete with the next flush. @@ -797,7 +807,17 @@ impl LockedDatabase { None => bail!("no such dir {}", dir_id), Some(d) => d, }; - dir.to_gc.append(ids); + dir.garbage_unlinked.reserve(ids.len()); + ids.retain(|id| { + if !dir.garbage_needs_unlink.remove(id) { + return true; + } + dir.garbage_unlinked.push(*id); + false + }); + if !ids.is_empty() { + bail!("delete_garbage with non-garbage ids {:?}", &ids[..]); + } Ok(()) } @@ -836,6 +856,11 @@ impl LockedDatabase { None => bail!("stream {} has no directory!", stream_id), Some(d) => d, }; + + // raw::delete_recordings does a bulk transfer of a range from recording to + // garbage, rather than operating on each element of to_delete. This is + // guaranteed to give the same result because to_delete is guaranteed to be the + // oldest recordings for the stream. let start = CompositeId::new(stream_id, 0); let end = CompositeId(l.id.0 + 1); let n = raw::delete_recordings(&tx, dir, start .. end)? as usize; @@ -847,7 +872,7 @@ impl LockedDatabase { } } for dir in self.sample_file_dirs_by_id.values() { - raw::mark_sample_files_deleted(&tx, &dir.to_gc)?; + raw::mark_sample_files_deleted(&tx, &dir.garbage_unlinked)?; } for (&stream_id, mut r) in &mut new_ranges { *r = raw::get_range(&tx, stream_id)?; @@ -870,10 +895,8 @@ impl LockedDatabase { // Process delete_garbage. let mut gced = 0; for dir in self.sample_file_dirs_by_id.values_mut() { - gced += dir.to_gc.len(); - for id in dir.to_gc.drain(..) { - dir.garbage.remove(&id); - } + gced += dir.garbage_unlinked.len(); + dir.garbage_unlinked.clear(); } let mut added = 0; @@ -887,7 +910,7 @@ impl LockedDatabase { s.sample_file_bytes -= s.bytes_to_delete; s.bytes_to_delete = 0; for row in s.to_delete.drain(..) { - d.garbage.insert(row.id); + d.garbage_needs_unlink.insert(row.id); let d = recording::Duration(row.duration as i64); s.duration -= d; adjust_days(row.start .. row.start + d, -1, &mut s.days); @@ -1295,8 +1318,8 @@ impl LockedDatabase { path: row.get_checked(1)?, dir: None, last_complete_open, - to_gc: Vec::new(), - garbage: raw::list_garbage(&self.conn, id)?, + garbage_needs_unlink: raw::list_garbage(&self.conn, id)?, + garbage_unlinked: Vec::new(), }); } info!("Loaded {} sample file dirs", self.sample_file_dirs_by_id.len()); @@ -1474,8 +1497,8 @@ impl LockedDatabase { uuid, dir: Some(dir), last_complete_open: None, - to_gc: Vec::new(), - garbage: FnvHashSet::default(), + garbage_needs_unlink: FnvHashSet::default(), + garbage_unlinked: Vec::new(), }), Entry::Occupied(_) => Err(format_err!("duplicate sample file dir id {}", id))?, }; @@ -1495,7 +1518,7 @@ impl LockedDatabase { ::std::collections::btree_map::Entry::Occupied(e) => e, _ => bail!("no such dir {} to remove", dir_id), }; - if !d.get().garbage.is_empty() { + if !d.get().garbage_needs_unlink.is_empty() || !d.get().garbage_unlinked.is_empty() { bail!("must collect garbage before deleting directory {}", d.get().path); } let dir = match d.get_mut().dir.take() { @@ -2230,10 +2253,19 @@ mod tests { .sample_file_dirs_by_id() .get(&sample_file_dir_id) .unwrap() - .garbage + .garbage_needs_unlink .iter() .map(|&id| id) .collect(); assert_eq!(&g, &[id]); + let g: Vec<_> = db.lock() + .sample_file_dirs_by_id() + .get(&sample_file_dir_id) + .unwrap() + .garbage_unlinked + .iter() + .map(|&id| id) + .collect(); + assert_eq!(&g, &[]); } } diff --git a/db/raw.rs b/db/raw.rs index 1f6a07a..3f6a26c 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -294,7 +294,13 @@ pub(crate) fn mark_sample_files_deleted(tx: &rusqlite::Transaction, ids: &[Compo for &id in ids { let changes = stmt.execute(&[&id.0])?; if changes != 1 { - bail!("no garbage row for {}", id); + // panic rather than return error. Errors get retried indefinitely, but there's no + // recovery from this condition. + // + // Tempting to just consider logging error and moving on, but this represents a logic + // flaw, so complain loudly. The freshly deleted file might still be referenced in the + // recording table. + panic!("no garbage row for {}", id); } } Ok(()) diff --git a/db/writer.rs b/db/writer.rs index bd2afdd..1233b18 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -316,7 +316,7 @@ impl Syncer> { let mut garbage: Vec<_> = { let l = self.db.lock(); let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); - d.garbage.iter().map(|id| *id).collect() + d.garbage_needs_unlink.iter().map(|id| *id).collect() }; if !garbage.is_empty() { // Try to delete files; retain ones in `garbage` that don't exist. @@ -352,8 +352,8 @@ impl Syncer { Ok(cmd) => cmd, }, Some((t, r, flushes)) => { - // Note: `flushes` will be dropped on exit from this block, which has the desired - // behavior of closing the channel. + // Note: `flushes` will be dropped on exit from this block, which has the + // desired behavior of closing the channel. let now = self.db.clocks().monotonic(); @@ -393,7 +393,7 @@ impl Syncer { let mut garbage: Vec<_> = { let l = self.db.lock(); let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); - d.garbage.iter().map(|id| *id).collect() + d.garbage_needs_unlink.iter().map(|id| *id).collect() }; if garbage.is_empty() { return; @@ -882,6 +882,87 @@ mod tests { fn eio() -> io::Error { io::Error::new(io::ErrorKind::Other, "got EIO") } + /// Tests the database flushing while a syncer is still processing a previous flush event. + #[test] + fn double_flush() { + testutil::init(); + let h = new_harness(); + h.db.lock().update_retention(&[db::RetentionChange { + stream_id: testutil::TEST_STREAM_ID, + new_record: true, + new_limit: 3, + }]).unwrap(); + + // Setup: add a 3-byte recording. + let video_sample_entry_id = h.db.lock().insert_video_sample_entry( + 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); + { + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"123", recording::Time(2), 0, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + w.close(Some(1)); + h.channel.flush(); + f.ensure_done(); + h.dir.ensure_done(); + + // Then a 1-byte recording. + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"4", recording::Time(3), 1, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ + let db = h.db.clone(); + move |_| { + // The drop(w) below should cause the old recording to be deleted (moved to + // garbage). When the database is flushed, the syncer forces garbage collection + // including this unlink. + + // Do another database flush here, as if from another syncer. + db.lock().flush("another syncer running").unwrap(); + Ok(()) + } + }))); + let (gc_done_snd, gc_done_rcv) = mpsc::channel(); + h.dir.expect(MockDirAction::Sync(Box::new(move || { + gc_done_snd.send(()).unwrap(); + Ok(()) + }))); + //h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); + //h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + + drop(w); + + gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call... + h.channel.flush(); // ...and the DatabaseFlush op to complete. + f.ensure_done(); + h.dir.ensure_done(); + } + + // Garbage should be marked collected on the next flush. + { + let mut l = h.db.lock(); + assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); + assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); + l.flush("forced gc").unwrap(); + assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); + assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); + } + + // The syncer should shut down cleanly. + drop(h.channel); + h.db.lock().clear_on_flush(); + h.join.join().unwrap(); + } + #[test] fn write_path_retries() { testutil::init(); @@ -1005,9 +1086,11 @@ mod tests { // Garbage should be marked collected on the next flush. { let mut l = h.db.lock(); - assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty()); + assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); + assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); l.flush("forced gc").unwrap(); - assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty()); + assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); + assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); } // The syncer should shut down cleanly.