make Writer enforce maximum recording duration

My installation recently somehow ended up with a recording with a
duration of 503793844 90,000ths of a second, way over the maximum of 5
minutes. (Looks like the machine was pretty unresponsive at the time
and/or having network problems.)

When this happens, the system really spirals. Every flush afterward (12
per minute with my installation) fails with a CHECK constraint failure
on the recording table. It never gives up on that recording. /var/log
fills pretty quickly as this failure is extremely verbose (a stack
trace, and a line for each byte of video_index). Eventually the sample
file dirs fill up too as it continues writing video samples while GC is
stuck. The video samples are useless anyway; given that they're not
referenced in the database, they'll be deleted on next startup.

This ensures the offending recording is never added to the database, so
we don't get the same persistent problem. Instead, writing to the
recording will fail. The stream will drop and be retried. If the
underlying condition that caused a too-long recording (many
non-key-frames, or the camera returning a crazy duration, or the
monotonic clock jumping forward extremely, or something) has gone away,
the system should recover.
This commit is contained in:
Scott Lamb 2019-01-29 08:26:36 -08:00
parent 3ba3bf2b18
commit c271cfa2b5
4 changed files with 56 additions and 42 deletions

View File

@ -313,9 +313,13 @@ impl SampleIndexEncoder {
}
pub fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
r: &mut db::RecordingToInsert) {
r: &mut db::RecordingToInsert) -> Result<(), Error> {
let duration_delta = duration_90k - self.prev_duration_90k;
self.prev_duration_90k = duration_90k;
let new_duration_90k = r.duration_90k + duration_90k;
if new_duration_90k as i64 > MAX_RECORDING_DURATION {
bail!("Duration {} exceeds maximum {}", new_duration_90k, MAX_RECORDING_DURATION);
}
r.duration_90k += duration_90k;
r.sample_file_bytes += bytes;
r.video_samples += 1;
@ -331,6 +335,7 @@ impl SampleIndexEncoder {
};
append_varint32((zigzag32(duration_delta) << 1) | (is_key as u32), &mut r.video_index);
append_varint32(zigzag32(bytes_delta), &mut r.video_index);
Ok(())
}
}
@ -563,11 +568,11 @@ mod tests {
testutil::init();
let mut r = db::RecordingToInsert::default();
let mut e = SampleIndexEncoder::new();
e.add_sample(10, 1000, true, &mut r);
e.add_sample(9, 10, false, &mut r);
e.add_sample(11, 15, false, &mut r);
e.add_sample(10, 12, false, &mut r);
e.add_sample(10, 1050, true, &mut r);
e.add_sample(10, 1000, true, &mut r).unwrap();
e.add_sample(9, 10, false, &mut r).unwrap();
e.add_sample(11, 15, false, &mut r).unwrap();
e.add_sample(10, 12, false, &mut r).unwrap();
e.add_sample(10, 1050, true, &mut r).unwrap();
assert_eq!(r.video_index, b"\x29\xd0\x0f\x02\x14\x08\x0a\x02\x05\x01\x64");
assert_eq!(10 + 9 + 11 + 10 + 10, r.duration_90k);
assert_eq!(5, r.video_samples);
@ -594,7 +599,7 @@ mod tests {
let mut r = db::RecordingToInsert::default();
let mut e = SampleIndexEncoder::new();
for sample in &samples {
e.add_sample(sample.duration_90k, sample.bytes, sample.is_key, &mut r);
e.add_sample(sample.duration_90k, sample.bytes, sample.is_key, &mut r).unwrap();
}
let mut it = SampleIndexIterator::new();
for sample in &samples {
@ -651,7 +656,7 @@ mod tests {
for i in 1..6 {
let duration_90k = 2 * i;
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, true, &mut r);
encoder.add_sample(duration_90k, bytes, true, &mut r).unwrap();
}
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
@ -670,7 +675,7 @@ mod tests {
for i in 1..6 {
let duration_90k = 2 * i;
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r);
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap();
}
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
@ -685,9 +690,9 @@ mod tests {
testutil::init();
let mut r = db::RecordingToInsert::default();
let mut encoder = SampleIndexEncoder::new();
encoder.add_sample(1, 1, true, &mut r);
encoder.add_sample(1, 2, true, &mut r);
encoder.add_sample(0, 3, true, &mut r);
encoder.add_sample(1, 1, true, &mut r).unwrap();
encoder.add_sample(1, 2, true, &mut r).unwrap();
encoder.add_sample(0, 3, true, &mut r).unwrap();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 1 .. 2).unwrap();
@ -700,7 +705,7 @@ mod tests {
testutil::init();
let mut r = db::RecordingToInsert::default();
let mut encoder = SampleIndexEncoder::new();
encoder.add_sample(1, 1, true, &mut r);
encoder.add_sample(1, 1, true, &mut r).unwrap();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 0).unwrap();
@ -717,7 +722,7 @@ mod tests {
for i in 1..6 {
let duration_90k = 2 * i;
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r);
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap();
}
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
@ -730,9 +735,9 @@ mod tests {
testutil::init();
let mut r = db::RecordingToInsert::default();
let mut encoder = SampleIndexEncoder::new();
encoder.add_sample(1, 1, true, &mut r);
encoder.add_sample(1, 2, true, &mut r);
encoder.add_sample(0, 3, true, &mut r);
encoder.add_sample(1, 1, true, &mut r).unwrap();
encoder.add_sample(1, 2, true, &mut r).unwrap();
encoder.add_sample(0, 3, true, &mut r).unwrap();
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 2).unwrap();

View File

@ -731,7 +731,15 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
unflushed.pts_90k, pts_90k);
}
let duration = w.adjuster.adjust(duration);
let d = w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
let d = match w.add_sample(duration, unflushed.len, unflushed.is_key,
unflushed.local_time) {
Ok(d) => d,
Err(e) => {
// Restore invariant.
w.unflushed_sample = Some(unflushed);
return Err(e);
},
};
// If the sample `write` was called on is a key frame, then the prior frames (including
// the one we just flushed) represent a live segment. Send it out.
@ -761,33 +769,34 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample.
pub fn close(&mut self, next_pts: Option<i64>) {
pub fn close(&mut self, next_pts: Option<i64>) -> Result<(), Error> {
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
WriterState::Open(w) => {
let prev = w.close(self.channel, next_pts, self.db, self.stream_id);
let prev = w.close(self.channel, next_pts, self.db, self.stream_id)?;
WriterState::Closed(prev)
},
s => s,
};
Ok(())
}
}
impl<F: FileWriter> InnerWriter<F> {
/// Returns the total duration of the `RecordingToInsert` (needed for live view path).
fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
pkt_local_time: recording::Time) -> i32 {
pkt_local_time: recording::Time) -> Result<i32, Error> {
let mut l = self.r.lock();
self.e.add_sample(duration_90k, bytes, is_key, &mut l);
self.e.add_sample(duration_90k, bytes, is_key, &mut l)?;
let new = pkt_local_time - recording::Duration(l.duration_90k as i64);
self.local_start = cmp::min(self.local_start, new);
if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust.
l.start = self.local_start;
}
l.duration_90k
Ok(l.duration_90k)
}
fn close<C: Clocks + Clone>(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>,
db: &db::Database<C>, stream_id: i32) -> PreviousWriter {
db: &db::Database<C>, stream_id: i32) -> Result<PreviousWriter, Error> {
let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample");
let (last_sample_duration, flags) = match next_pts {
None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32),
@ -797,7 +806,7 @@ impl<F: FileWriter> InnerWriter<F> {
sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]);
let (local_time_delta, run_offset, end);
let d = self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key,
unflushed.local_time);
unflushed.local_time)?;
// This always ends a live segment.
db.lock().send_live_segment(stream_id, db::LiveSegment {
@ -817,11 +826,11 @@ impl<F: FileWriter> InnerWriter<F> {
}
drop(self.r);
channel.async_save_recording(self.id, total_duration, self.f);
PreviousWriter {
Ok(PreviousWriter {
end,
local_time_delta,
run_offset,
}
})
}
}
@ -1006,7 +1015,7 @@ mod tests {
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1));
w.close(Some(1)).unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
@ -1153,7 +1162,7 @@ mod tests {
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1));
w.close(Some(1)).unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);

View File

@ -1916,7 +1916,7 @@ mod tests {
for i in 1..6 {
let duration_90k = 2 * i;
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, true, &mut r);
encoder.add_sample(duration_90k, bytes, true, &mut r).unwrap();
}
// Time range [2, 2+4+6+8) means the 2nd, 3rd, and 4th samples should be included.
@ -1970,7 +1970,7 @@ mod tests {
for i in 1..6 {
let duration_90k = 2 * i;
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r);
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap();
}
// Time range [2+4+6, 2+4+6+8) means the 4th sample should be included.
@ -2039,14 +2039,14 @@ mod tests {
let mut encoders = Vec::new();
let mut r = db::RecordingToInsert::default();
let mut encoder = recording::SampleIndexEncoder::new();
encoder.add_sample(1, 1, true, &mut r);
encoder.add_sample(2, 2, false, &mut r);
encoder.add_sample(3, 3, true, &mut r);
encoder.add_sample(1, 1, true, &mut r).unwrap();
encoder.add_sample(2, 2, false, &mut r).unwrap();
encoder.add_sample(3, 3, true, &mut r).unwrap();
encoders.push(r);
let mut r = db::RecordingToInsert::default();
let mut encoder = recording::SampleIndexEncoder::new();
encoder.add_sample(4, 4, true, &mut r);
encoder.add_sample(5, 5, false, &mut r);
encoder.add_sample(4, 4, true, &mut r).unwrap();
encoder.add_sample(5, 5, false, &mut r).unwrap();
encoders.push(r);
// This should include samples 3 and 4 only, both sync frames.
@ -2076,12 +2076,12 @@ mod tests {
let mut encoders = Vec::new();
let mut r = db::RecordingToInsert::default();
let mut encoder = recording::SampleIndexEncoder::new();
encoder.add_sample(2, 1, true, &mut r);
encoder.add_sample(3, 2, false, &mut r);
encoder.add_sample(2, 1, true, &mut r).unwrap();
encoder.add_sample(3, 2, false, &mut r).unwrap();
encoders.push(r);
let mut r = db::RecordingToInsert::default();
let mut encoder = recording::SampleIndexEncoder::new();
encoder.add_sample(0, 3, true, &mut r);
encoder.add_sample(0, 3, true, &mut r).unwrap();
encoders.push(r);
// Multi-segment recording with an edit list, encoding with a zero-duration recording.
@ -2104,7 +2104,7 @@ mod tests {
for i in 1..6 {
let duration_90k = 2 * i;
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r);
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap();
}
// Time range [2+4+6, 2+4+6+8+1) means the 4th sample and part of the 5th are included.

View File

@ -140,7 +140,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::
if frame_realtime.sec > r && pkt.is_key() {
trace!("{}: write on normal rotation", self.short_name);
let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(Some(pts));
w.close(Some(pts))?;
None
} else {
Some(r)
@ -179,7 +179,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::
}
if rotate.is_some() {
let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(None);
w.close(None)?;
}
Ok(())
}