mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-01-13 16:03:22 -05:00
warn if a streamer op takes too long
My odroid setup has been occasionally (about once a week) losing about 15 seconds of recordings on all cameras. I'm not sure why. So I'm labelling all the likely suspect spots and logging if any of them takes longer than a second. I think this will give me more information; hopefully narrow it down to network or local disk I/O.
This commit is contained in:
parent
6902be1981
commit
c43fb80639
28
src/clock.rs
28
src/clock.rs
@ -76,6 +76,34 @@ impl Clocks for RealClocks {
|
||||
}
|
||||
}
|
||||
|
||||
/// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied
|
||||
/// function.
|
||||
pub struct TimerGuard<'a, C: Clocks + 'a, S: AsRef<str>, F: FnOnce() -> S + 'a> {
|
||||
clocks: &'a C,
|
||||
label_f: Option<F>,
|
||||
start: Timespec,
|
||||
}
|
||||
|
||||
impl<'a, C: Clocks + 'a, S: AsRef<str>, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> {
|
||||
pub fn new(clocks: &'a C, label_f: F) -> Self {
|
||||
TimerGuard {
|
||||
clocks,
|
||||
label_f: Some(label_f),
|
||||
start: clocks.monotonic(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, C: Clocks + 'a, S: AsRef<str>, F: FnOnce() -> S + 'a> Drop for TimerGuard<'a, C, S, F> {
|
||||
fn drop(&mut self) {
|
||||
let elapsed = self.clocks.monotonic() - self.start;
|
||||
if elapsed.num_seconds() >= 1 {
|
||||
let label_f = self.label_f.take().unwrap();
|
||||
warn!("{} took {}!", label_f().as_ref(), elapsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Simulated clock for testing.
|
||||
#[cfg(test)]
|
||||
pub struct SimulatedClocks {
|
||||
|
@ -28,7 +28,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use clock::Clocks;
|
||||
use clock::{Clocks, TimerGuard};
|
||||
use db::{Camera, Database};
|
||||
use dir;
|
||||
use error::Error;
|
||||
@ -111,21 +111,29 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
|
||||
fn run_once(&mut self) -> Result<(), Error> {
|
||||
info!("{}: Opening input: {}", self.short_name, self.redacted_url);
|
||||
|
||||
let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?;
|
||||
let mut stream = {
|
||||
let _t = TimerGuard::new(self.clocks, || format!("opening {}", self.redacted_url));
|
||||
self.opener.open(stream::Source::Rtsp(&self.url))?
|
||||
};
|
||||
let realtime_offset = self.clocks.realtime() - self.clocks.monotonic();
|
||||
// TODO: verify width/height.
|
||||
let extra_data = stream.get_extra_data()?;
|
||||
let video_sample_entry_id =
|
||||
let video_sample_entry_id = {
|
||||
let _t = TimerGuard::new(self.clocks, || "inserting video sample entry");
|
||||
self.db.lock().insert_video_sample_entry(extra_data.width, extra_data.height,
|
||||
extra_data.sample_entry,
|
||||
extra_data.rfc6381_codec)?;
|
||||
extra_data.rfc6381_codec)?
|
||||
};
|
||||
debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id);
|
||||
let mut seen_key_frame = false;
|
||||
let mut state: Option<WriterState> = None;
|
||||
let mut transformed = Vec::new();
|
||||
let mut prev = None;
|
||||
while !self.shutdown.load(Ordering::SeqCst) {
|
||||
let pkt = stream.get_next()?;
|
||||
let pkt = {
|
||||
let _t = TimerGuard::new(self.clocks, || "getting next packet");
|
||||
stream.get_next()?
|
||||
};
|
||||
let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?;
|
||||
if !seen_key_frame && !pkt.is_key() {
|
||||
continue;
|
||||
@ -138,6 +146,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
|
||||
state = if let Some(s) = state {
|
||||
if frame_realtime.sec > s.rotate && pkt.is_key() {
|
||||
trace!("{}: write on normal rotation", self.short_name);
|
||||
let _t = TimerGuard::new(self.clocks, || "closing writer");
|
||||
prev = Some(s.writer.close(Some(pts))?);
|
||||
None
|
||||
} else {
|
||||
@ -157,6 +166,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
|
||||
// the start time.
|
||||
let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 };
|
||||
|
||||
let _t = TimerGuard::new(self.clocks, || "creating writer");
|
||||
let w = self.dir.create_writer(&self.syncer_channel, prev, self.camera_id,
|
||||
video_sample_entry_id)?;
|
||||
WriterState{
|
||||
@ -175,10 +185,13 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
|
||||
} else {
|
||||
orig_data
|
||||
};
|
||||
let _t = TimerGuard::new(self.clocks,
|
||||
|| format!("writing {} bytes", transformed_data.len()));
|
||||
s.writer.write(transformed_data, local_time, pts, pkt.is_key())?;
|
||||
state = Some(s);
|
||||
}
|
||||
if let Some(s) = state {
|
||||
let _t = TimerGuard::new(self.clocks, || "closing writer");
|
||||
s.writer.close(None)?;
|
||||
}
|
||||
Ok(())
|
||||
|
Loading…
Reference in New Issue
Block a user