improve Syncer's robustness

The new approach is to, rather than panicking, retry forever. The assumption
is that if a given operation is failing, a following operation is unlikely to
succeed, so it's simpler to just keep trying the earlier one than come up with
ways to undo it and proceed with later operations.

I still need to apply this approach to the Writer class. It currently unwraps
(crashes) or just gives up on a recording without ever sending it to the
Syncer. Given that recordings are all synced in order, that means further ones
can never be synced.
This commit is contained in:
Scott Lamb 2018-02-28 11:07:55 -08:00
parent b790075ca2
commit b1d71c4e8d
2 changed files with 38 additions and 46 deletions

View File

@ -294,7 +294,6 @@ impl SampleFileDir {
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
enum SyncerCommand {
AsyncSaveRecording(CompositeId, Arc<Mutex<db::UncommittedRecording>>, fs::File),
//AsyncAbandonRecording(CompositeId),
DatabaseFlushed,
Flush(mpsc::SyncSender<()>),
}
@ -320,8 +319,10 @@ struct Syncer {
/// file writing starts. Afterward the syncing happens in a background thread.
///
/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and
/// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be
/// dropped and then the handle joined to allow all recordings to be persisted.
/// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried
/// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all
/// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to
/// be persisted.
///
/// Note that dropping all `SyncerChannel` clones currently includes calling
/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes.
@ -414,10 +415,6 @@ impl SyncerChannel {
self.0.send(SyncerCommand::AsyncSaveRecording(id, recording, f)).unwrap();
}
//fn async_abandon_recording(&self, id: CompositeId) {
// self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap();
//}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0);
@ -497,14 +494,16 @@ impl Syncer {
match cmds.recv() {
Err(_) => return, // all senders have closed the channel; shutdown
Ok(SyncerCommand::AsyncSaveRecording(id, r, f)) => self.save(id, r, f),
//Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid),
Ok(SyncerCommand::DatabaseFlushed) => { let _ = self.collect_garbage(true); },
Ok(SyncerCommand::DatabaseFlushed) => {
retry_forever(&mut || self.collect_garbage(true))
},
Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it.
};
}
}
/// Rotates files for all streams and deletes stale files from previous runs.
/// Called from main thread.
fn initial_rotation(&mut self) -> Result<(), Error> {
self.do_rotation(|db| {
let streams: Vec<i32> = db.streams_by_id().keys().map(|&id| id).collect();
@ -515,6 +514,7 @@ impl Syncer {
})
}
/// Helper to do initial or retention-lowering rotation. Called from main thread.
fn do_rotation<F>(&mut self, delete_recordings: F) -> Result<(), Error>
where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> {
{
@ -526,16 +526,11 @@ impl Syncer {
self.db.lock().flush("synchronous garbage collection")
}
/// Helper for collecting garbage; called from main or worker threads.
fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> {
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = match l.sample_file_dirs_by_id().get(&self.dir_id) {
None => {
error!("can't find dir {} in db!", self.dir_id);
bail!("can't find dir {} in db!", self.dir_id);
},
Some(d) => d,
};
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
d.garbage.iter().map(|id| *id).collect()
};
let len_before = garbage.len();
@ -553,7 +548,8 @@ impl Syncer {
true
});
let res = if len_before > garbage.len() {
Err(format_err!("Unable to unlink {} files", len_before - garbage.len()))
Err(format_err!("Unable to unlink {} files (see earlier warning messages for details)",
len_before - garbage.len()))
} else {
Ok(())
};
@ -573,21 +569,20 @@ impl Syncer {
res
}
/// Saves the given recording and causes rotation to happen.
/// Saves the given recording and causes rotation to happen. Called from worker thread.
///
/// Note that part of rotation is deferred for the next cycle (saved writing or program startup)
/// so that there can be only one dir sync and database transaction per save.
/// Internal helper for `save`. This is separated out so that the question-mark operator
/// can be used in the many error paths.
/// TODO: less unwrapping! keep a queue?
fn save(&mut self, id: CompositeId, recording: Arc<Mutex<db::UncommittedRecording>>,
f: fs::File) {
let stream_id = id.stream();
// Free up a like number of bytes.
delete_recordings(&mut self.db.lock(), stream_id, 0).unwrap();
f.sync_all().unwrap();
self.dir.sync().unwrap();
retry_forever(&mut || delete_recordings(&mut self.db.lock(), stream_id, 0));
retry_forever(&mut || f.sync_all());
retry_forever(&mut || self.dir.sync());
recording.lock().synced = true;
let mut db = self.db.lock();
let reason = {
@ -602,12 +597,29 @@ impl Syncer {
format!("{}-{}: unflushed={} >= if={}",
c.short_name, s.type_.as_str(), unflushed, s.flush_if)
};
if let Err(e) = db.flush(&reason) {
error!("flush failure on save: {:?}", e);
// Don't retry the commit now in case it causes extra flash write cycles.
// It's not necessary for correctness to flush before proceeding.
// Just wait until the next flush would happen naturally.
warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}",
reason, e);
}
}
}
fn retry_forever<T, E: Into<Error>>(f: &mut FnMut() -> Result<T, E>) -> T {
let sleep_time = ::std::time::Duration::new(1, 0);
loop {
let e = match f() {
Ok(t) => return t,
Err(e) => e.into(),
};
warn!("sleeping for {:?} after error: {:?}", sleep_time, e);
thread::sleep(sleep_time);
}
}
/// Single-use struct to write a single recording to disk and commit its metadata to the database.
/// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the
/// syncer when done. It either saves the recording to the database (if I/O errors do not prevent
@ -624,7 +636,6 @@ struct InnerWriter<'a> {
r: Arc<Mutex<db::UncommittedRecording>>,
index: recording::SampleIndexEncoder,
id: CompositeId,
corrupt: bool,
hasher: hash::Hasher,
/// The end time of the previous segment in this run, if any.
@ -724,7 +735,6 @@ impl<'a> Writer<'a> {
r,
index: recording::SampleIndexEncoder::new(),
id,
corrupt: false,
hasher: hash::Hasher::new(hash::MessageDigest::sha1())?,
prev_end: prev.map(|p| p.end_time),
local_start: recording::Time(i64::max_value()),
@ -752,21 +762,7 @@ impl<'a> Writer<'a> {
}
let mut remaining = pkt;
while !remaining.is_empty() {
let written = match w.f.write(remaining) {
Ok(b) => b,
Err(e) => {
if remaining.len() < pkt.len() {
// Partially written packet. Truncate if possible.
// TODO: have the syncer do this / retry it if necessary?
if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) {
error!("After write to {} failed with {}, truncate failed with {}; \
sample file is corrupt.", w.id, e, e2);
w.corrupt = true;
}
}
return Err(Error::from(e));
},
};
let written = retry_forever(&mut || w.f.write(remaining));
remaining = &remaining[written..];
}
w.unflushed_sample = Some(UnflushedSample{
@ -793,10 +789,6 @@ impl<'a> InnerWriter<'a> {
}
fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> {
if self.corrupt {
//self.syncer_channel.async_abandon_recording(self.id);
bail!("recording {} is corrupt", self.id);
}
let unflushed =
self.unflushed_sample.take()
.ok_or_else(|| format_err!("recording {} has no packets", self.id))?;

View File

@ -99,7 +99,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
while !self.shutdown.load(Ordering::SeqCst) {
if let Err(e) = self.run_once() {
let sleep_time = time::Duration::seconds(1);
warn!("{}: sleeping for {:?} after error: {}", self.short_name, sleep_time, e);
warn!("{}: sleeping for {:?} after error: {:?}", self.short_name, sleep_time, e);
self.clocks.sleep(sleep_time);
}
}