Merge branch 'master' into auth

This commit is contained in:
Scott Lamb 2018-12-01 00:06:43 -08:00
commit d35a4592e3
3 changed files with 144 additions and 23 deletions

View File

@ -273,8 +273,15 @@ pub struct SampleFileDir {
pub uuid: Uuid,
dir: Option<Arc<dir::SampleFileDir>>,
last_complete_open: Option<Open>,
to_gc: Vec<CompositeId>,
pub(crate) garbage: FnvHashSet<CompositeId>,
/// ids which are in the `garbage` database table (rather than `recording`) as of last commit
/// but may still exist on disk. These can't be safely removed from the database yet.
pub(crate) garbage_needs_unlink: FnvHashSet<CompositeId>,
/// ids which are in the `garbage` database table and are guaranteed to no longer exist on
/// disk (have been unlinked and the dir has been synced). These may be removed from the
/// database on next flush. Mutually exclusive with `garbage_needs_unlink`.
pub(crate) garbage_unlinked: Vec<CompositeId>,
}
impl SampleFileDir {
@ -379,7 +386,10 @@ pub struct Stream {
pub range: Option<Range<recording::Time>>,
pub sample_file_bytes: i64,
/// On flush, delete the following recordings. Note they must be the oldest recordings.
/// On flush, delete the following recordings (move them to the `garbage` table, to be
/// collected later). Note they must be the oldest recordings. The later collection involves
/// the syncer unlinking the files on disk and syncing the directory then enqueueing for
/// another following flush removal from the `garbage` table.
to_delete: Vec<ListOldestRecordingsRow>,
/// The total bytes to delete with the next flush.
@ -797,7 +807,17 @@ impl LockedDatabase {
None => bail!("no such dir {}", dir_id),
Some(d) => d,
};
dir.to_gc.append(ids);
dir.garbage_unlinked.reserve(ids.len());
ids.retain(|id| {
if !dir.garbage_needs_unlink.remove(id) {
return true;
}
dir.garbage_unlinked.push(*id);
false
});
if !ids.is_empty() {
bail!("delete_garbage with non-garbage ids {:?}", &ids[..]);
}
Ok(())
}
@ -836,6 +856,11 @@ impl LockedDatabase {
None => bail!("stream {} has no directory!", stream_id),
Some(d) => d,
};
// raw::delete_recordings does a bulk transfer of a range from recording to
// garbage, rather than operating on each element of to_delete. This is
// guaranteed to give the same result because to_delete is guaranteed to be the
// oldest recordings for the stream.
let start = CompositeId::new(stream_id, 0);
let end = CompositeId(l.id.0 + 1);
let n = raw::delete_recordings(&tx, dir, start .. end)? as usize;
@ -847,7 +872,7 @@ impl LockedDatabase {
}
}
for dir in self.sample_file_dirs_by_id.values() {
raw::mark_sample_files_deleted(&tx, &dir.to_gc)?;
raw::mark_sample_files_deleted(&tx, &dir.garbage_unlinked)?;
}
for (&stream_id, mut r) in &mut new_ranges {
*r = raw::get_range(&tx, stream_id)?;
@ -870,10 +895,8 @@ impl LockedDatabase {
// Process delete_garbage.
let mut gced = 0;
for dir in self.sample_file_dirs_by_id.values_mut() {
gced += dir.to_gc.len();
for id in dir.to_gc.drain(..) {
dir.garbage.remove(&id);
}
gced += dir.garbage_unlinked.len();
dir.garbage_unlinked.clear();
}
let mut added = 0;
@ -887,7 +910,7 @@ impl LockedDatabase {
s.sample_file_bytes -= s.bytes_to_delete;
s.bytes_to_delete = 0;
for row in s.to_delete.drain(..) {
d.garbage.insert(row.id);
d.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(row.duration as i64);
s.duration -= d;
adjust_days(row.start .. row.start + d, -1, &mut s.days);
@ -1295,8 +1318,8 @@ impl LockedDatabase {
path: row.get_checked(1)?,
dir: None,
last_complete_open,
to_gc: Vec::new(),
garbage: raw::list_garbage(&self.conn, id)?,
garbage_needs_unlink: raw::list_garbage(&self.conn, id)?,
garbage_unlinked: Vec::new(),
});
}
info!("Loaded {} sample file dirs", self.sample_file_dirs_by_id.len());
@ -1474,8 +1497,8 @@ impl LockedDatabase {
uuid,
dir: Some(dir),
last_complete_open: None,
to_gc: Vec::new(),
garbage: FnvHashSet::default(),
garbage_needs_unlink: FnvHashSet::default(),
garbage_unlinked: Vec::new(),
}),
Entry::Occupied(_) => Err(format_err!("duplicate sample file dir id {}", id))?,
};
@ -1495,7 +1518,7 @@ impl LockedDatabase {
::std::collections::btree_map::Entry::Occupied(e) => e,
_ => bail!("no such dir {} to remove", dir_id),
};
if !d.get().garbage.is_empty() {
if !d.get().garbage_needs_unlink.is_empty() || !d.get().garbage_unlinked.is_empty() {
bail!("must collect garbage before deleting directory {}", d.get().path);
}
let dir = match d.get_mut().dir.take() {
@ -2230,10 +2253,19 @@ mod tests {
.sample_file_dirs_by_id()
.get(&sample_file_dir_id)
.unwrap()
.garbage
.garbage_needs_unlink
.iter()
.map(|&id| id)
.collect();
assert_eq!(&g, &[id]);
let g: Vec<_> = db.lock()
.sample_file_dirs_by_id()
.get(&sample_file_dir_id)
.unwrap()
.garbage_unlinked
.iter()
.map(|&id| id)
.collect();
assert_eq!(&g, &[]);
}
}

View File

@ -294,7 +294,13 @@ pub(crate) fn mark_sample_files_deleted(tx: &rusqlite::Transaction, ids: &[Compo
for &id in ids {
let changes = stmt.execute(&[&id.0])?;
if changes != 1 {
bail!("no garbage row for {}", id);
// panic rather than return error. Errors get retried indefinitely, but there's no
// recovery from this condition.
//
// Tempting to just consider logging error and moving on, but this represents a logic
// flaw, so complain loudly. The freshly deleted file might still be referenced in the
// recording table.
panic!("no garbage row for {}", id);
}
}
Ok(())

View File

@ -316,7 +316,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
d.garbage.iter().map(|id| *id).collect()
d.garbage_needs_unlink.iter().map(|id| *id).collect()
};
if !garbage.is_empty() {
// Try to delete files; retain ones in `garbage` that don't exist.
@ -352,8 +352,8 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
Ok(cmd) => cmd,
},
Some((t, r, flushes)) => {
// Note: `flushes` will be dropped on exit from this block, which has the desired
// behavior of closing the channel.
// Note: `flushes` will be dropped on exit from this block, which has the
// desired behavior of closing the channel.
let now = self.db.clocks().monotonic();
@ -393,7 +393,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
d.garbage.iter().map(|id| *id).collect()
d.garbage_needs_unlink.iter().map(|id| *id).collect()
};
if garbage.is_empty() {
return;
@ -882,6 +882,87 @@ mod tests {
fn eio() -> io::Error { io::Error::new(io::ErrorKind::Other, "got EIO") }
/// Tests the database flushing while a syncer is still processing a previous flush event.
#[test]
fn double_flush() {
testutil::init();
let h = new_harness();
h.db.lock().update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID,
new_record: true,
new_limit: 3,
}]).unwrap();
// Setup: add a 3-byte recording.
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
{
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1));
h.channel.flush();
f.ensure_done();
h.dir.ensure_done();
// Then a 1-byte recording.
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
let db = h.db.clone();
move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to
// garbage). When the database is flushed, the syncer forces garbage collection
// including this unlink.
// Do another database flush here, as if from another syncer.
db.lock().flush("another syncer running").unwrap();
Ok(())
}
})));
let (gc_done_snd, gc_done_rcv) = mpsc::channel();
h.dir.expect(MockDirAction::Sync(Box::new(move || {
gc_done_snd.send(()).unwrap();
Ok(())
})));
//h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
//h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call...
h.channel.flush(); // ...and the DatabaseFlush op to complete.
f.ensure_done();
h.dir.ensure_done();
}
// Garbage should be marked collected on the next flush.
{
let mut l = h.db.lock();
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
l.flush("forced gc").unwrap();
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
}
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
h.join.join().unwrap();
}
#[test]
fn write_path_retries() {
testutil::init();
@ -1005,9 +1086,11 @@ mod tests {
// Garbage should be marked collected on the next flush.
{
let mut l = h.db.lock();
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty());
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
l.flush("forced gc").unwrap();
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty());
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
}
// The syncer should shut down cleanly.