diff --git a/db/dir.rs b/db/dir.rs index 8f07445..95368fe 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -294,7 +294,6 @@ impl SampleFileDir { /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { AsyncSaveRecording(CompositeId, Arc>, fs::File), - //AsyncAbandonRecording(CompositeId), DatabaseFlushed, Flush(mpsc::SyncSender<()>), } @@ -320,8 +319,10 @@ struct Syncer { /// file writing starts. Afterward the syncing happens in a background thread. /// /// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and -/// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be -/// dropped and then the handle joined to allow all recordings to be persisted. +/// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried +/// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all +/// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to +/// be persisted. /// /// Note that dropping all `SyncerChannel` clones currently includes calling /// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. @@ -414,10 +415,6 @@ impl SyncerChannel { self.0.send(SyncerCommand::AsyncSaveRecording(id, recording, f)).unwrap(); } - //fn async_abandon_recording(&self, id: CompositeId) { - // self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap(); - //} - /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. pub fn flush(&self) { let (snd, rcv) = mpsc::sync_channel(0); @@ -497,14 +494,16 @@ impl Syncer { match cmds.recv() { Err(_) => return, // all senders have closed the channel; shutdown Ok(SyncerCommand::AsyncSaveRecording(id, r, f)) => self.save(id, r, f), - //Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid), - Ok(SyncerCommand::DatabaseFlushed) => { let _ = self.collect_garbage(true); }, + Ok(SyncerCommand::DatabaseFlushed) => { + retry_forever(&mut || self.collect_garbage(true)) + }, Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. }; } } /// Rotates files for all streams and deletes stale files from previous runs. + /// Called from main thread. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { let streams: Vec = db.streams_by_id().keys().map(|&id| id).collect(); @@ -515,6 +514,7 @@ impl Syncer { }) } + /// Helper to do initial or retention-lowering rotation. Called from main thread. fn do_rotation(&mut self, delete_recordings: F) -> Result<(), Error> where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> { { @@ -526,16 +526,11 @@ impl Syncer { self.db.lock().flush("synchronous garbage collection") } + /// Helper for collecting garbage; called from main or worker threads. fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> { let mut garbage: Vec<_> = { let l = self.db.lock(); - let d = match l.sample_file_dirs_by_id().get(&self.dir_id) { - None => { - error!("can't find dir {} in db!", self.dir_id); - bail!("can't find dir {} in db!", self.dir_id); - }, - Some(d) => d, - }; + let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); d.garbage.iter().map(|id| *id).collect() }; let len_before = garbage.len(); @@ -553,7 +548,8 @@ impl Syncer { true }); let res = if len_before > garbage.len() { - Err(format_err!("Unable to unlink {} files", len_before - garbage.len())) + Err(format_err!("Unable to unlink {} files (see earlier warning messages for details)", + len_before - garbage.len())) } else { Ok(()) }; @@ -573,21 +569,20 @@ impl Syncer { res } - /// Saves the given recording and causes rotation to happen. + /// Saves the given recording and causes rotation to happen. Called from worker thread. + /// /// Note that part of rotation is deferred for the next cycle (saved writing or program startup) /// so that there can be only one dir sync and database transaction per save. /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. - /// TODO: less unwrapping! keep a queue? fn save(&mut self, id: CompositeId, recording: Arc>, f: fs::File) { let stream_id = id.stream(); // Free up a like number of bytes. - delete_recordings(&mut self.db.lock(), stream_id, 0).unwrap(); - - f.sync_all().unwrap(); - self.dir.sync().unwrap(); + retry_forever(&mut || delete_recordings(&mut self.db.lock(), stream_id, 0)); + retry_forever(&mut || f.sync_all()); + retry_forever(&mut || self.dir.sync()); recording.lock().synced = true; let mut db = self.db.lock(); let reason = { @@ -602,12 +597,29 @@ impl Syncer { format!("{}-{}: unflushed={} >= if={}", c.short_name, s.type_.as_str(), unflushed, s.flush_if) }; + if let Err(e) = db.flush(&reason) { - error!("flush failure on save: {:?}", e); + // Don't retry the commit now in case it causes extra flash write cycles. + // It's not necessary for correctness to flush before proceeding. + // Just wait until the next flush would happen naturally. + warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}", + reason, e); } } } +fn retry_forever>(f: &mut FnMut() -> Result) -> T { + let sleep_time = ::std::time::Duration::new(1, 0); + loop { + let e = match f() { + Ok(t) => return t, + Err(e) => e.into(), + }; + warn!("sleeping for {:?} after error: {:?}", sleep_time, e); + thread::sleep(sleep_time); + } +} + /// Single-use struct to write a single recording to disk and commit its metadata to the database. /// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the /// syncer when done. It either saves the recording to the database (if I/O errors do not prevent @@ -624,7 +636,6 @@ struct InnerWriter<'a> { r: Arc>, index: recording::SampleIndexEncoder, id: CompositeId, - corrupt: bool, hasher: hash::Hasher, /// The end time of the previous segment in this run, if any. @@ -724,7 +735,6 @@ impl<'a> Writer<'a> { r, index: recording::SampleIndexEncoder::new(), id, - corrupt: false, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), @@ -752,21 +762,7 @@ impl<'a> Writer<'a> { } let mut remaining = pkt; while !remaining.is_empty() { - let written = match w.f.write(remaining) { - Ok(b) => b, - Err(e) => { - if remaining.len() < pkt.len() { - // Partially written packet. Truncate if possible. - // TODO: have the syncer do this / retry it if necessary? - if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) { - error!("After write to {} failed with {}, truncate failed with {}; \ - sample file is corrupt.", w.id, e, e2); - w.corrupt = true; - } - } - return Err(Error::from(e)); - }, - }; + let written = retry_forever(&mut || w.f.write(remaining)); remaining = &remaining[written..]; } w.unflushed_sample = Some(UnflushedSample{ @@ -793,10 +789,6 @@ impl<'a> InnerWriter<'a> { } fn close(mut self, next_pts: Option) -> Result { - if self.corrupt { - //self.syncer_channel.async_abandon_recording(self.id); - bail!("recording {} is corrupt", self.id); - } let unflushed = self.unflushed_sample.take() .ok_or_else(|| format_err!("recording {} has no packets", self.id))?; diff --git a/src/streamer.rs b/src/streamer.rs index f1936aa..6d16dc2 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -99,7 +99,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { while !self.shutdown.load(Ordering::SeqCst) { if let Err(e) = self.run_once() { let sleep_time = time::Duration::seconds(1); - warn!("{}: sleeping for {:?} after error: {}", self.short_name, sleep_time, e); + warn!("{}: sleeping for {:?} after error: {:?}", self.short_name, sleep_time, e); self.clocks.sleep(sleep_time); } }