From c43fb80639a79f03cd20410491d57c7a76793bd8 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Wed, 31 Jan 2018 14:20:30 -0800 Subject: [PATCH] warn if a streamer op takes too long My odroid setup has been occasionally (about once a week) losing about 15 seconds of recordings on all cameras. I'm not sure why. So I'm labelling all the likely suspect spots and logging if any of them takes longer than a second. I think this will give me more information; hopefully narrow it down to network or local disk I/O. --- src/clock.rs | 28 ++++++++++++++++++++++++++++ src/streamer.rs | 23 ++++++++++++++++++----- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/clock.rs b/src/clock.rs index f0133a4..ef60898 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -76,6 +76,34 @@ impl Clocks for RealClocks { } } +/// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied +/// function. +pub struct TimerGuard<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> { + clocks: &'a C, + label_f: Option, + start: Timespec, +} + +impl<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> { + pub fn new(clocks: &'a C, label_f: F) -> Self { + TimerGuard { + clocks, + label_f: Some(label_f), + start: clocks.monotonic(), + } + } +} + +impl<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> Drop for TimerGuard<'a, C, S, F> { + fn drop(&mut self) { + let elapsed = self.clocks.monotonic() - self.start; + if elapsed.num_seconds() >= 1 { + let label_f = self.label_f.take().unwrap(); + warn!("{} took {}!", label_f().as_ref(), elapsed); + } + } +} + /// Simulated clock for testing. #[cfg(test)] pub struct SimulatedClocks { diff --git a/src/streamer.rs b/src/streamer.rs index 104074e..73702f4 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -28,7 +28,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use clock::Clocks; +use clock::{Clocks, TimerGuard}; use db::{Camera, Database}; use dir; use error::Error; @@ -111,21 +111,29 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { fn run_once(&mut self) -> Result<(), Error> { info!("{}: Opening input: {}", self.short_name, self.redacted_url); - let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?; + let mut stream = { + let _t = TimerGuard::new(self.clocks, || format!("opening {}", self.redacted_url)); + self.opener.open(stream::Source::Rtsp(&self.url))? + }; let realtime_offset = self.clocks.realtime() - self.clocks.monotonic(); // TODO: verify width/height. let extra_data = stream.get_extra_data()?; - let video_sample_entry_id = + let video_sample_entry_id = { + let _t = TimerGuard::new(self.clocks, || "inserting video sample entry"); self.db.lock().insert_video_sample_entry(extra_data.width, extra_data.height, extra_data.sample_entry, - extra_data.rfc6381_codec)?; + extra_data.rfc6381_codec)? + }; debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id); let mut seen_key_frame = false; let mut state: Option = None; let mut transformed = Vec::new(); let mut prev = None; while !self.shutdown.load(Ordering::SeqCst) { - let pkt = stream.get_next()?; + let pkt = { + let _t = TimerGuard::new(self.clocks, || "getting next packet"); + stream.get_next()? + }; let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?; if !seen_key_frame && !pkt.is_key() { continue; @@ -138,6 +146,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { state = if let Some(s) = state { if frame_realtime.sec > s.rotate && pkt.is_key() { trace!("{}: write on normal rotation", self.short_name); + let _t = TimerGuard::new(self.clocks, || "closing writer"); prev = Some(s.writer.close(Some(pts))?); None } else { @@ -157,6 +166,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // the start time. let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 }; + let _t = TimerGuard::new(self.clocks, || "creating writer"); let w = self.dir.create_writer(&self.syncer_channel, prev, self.camera_id, video_sample_entry_id)?; WriterState{ @@ -175,10 +185,13 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { } else { orig_data }; + let _t = TimerGuard::new(self.clocks, + || format!("writing {} bytes", transformed_data.len())); s.writer.write(transformed_data, local_time, pts, pkt.is_key())?; state = Some(s); } if let Some(s) = state { + let _t = TimerGuard::new(self.clocks, || "closing writer"); s.writer.close(None)?; } Ok(())