end reason for recording runs

Part of #155
This commit is contained in:
Scott Lamb 2021-09-16 16:24:17 -07:00
parent dafd9041d6
commit f86f03cf59
7 changed files with 37 additions and 12 deletions

View File

@ -272,6 +272,7 @@ pub struct RecordingToInsert {
pub video_sample_entry_id: i32, pub video_sample_entry_id: i32,
pub video_index: Vec<u8>, pub video_index: Vec<u8>,
pub sample_file_blake3: Option<[u8; 32]>, pub sample_file_blake3: Option<[u8; 32]>,
pub end_reason: Option<String>,
} }
impl RecordingToInsert { impl RecordingToInsert {
@ -2658,6 +2659,7 @@ mod tests {
video_sample_entry_id: vse_id, video_sample_entry_id: vse_id,
video_index: [0u8; 100].to_vec(), video_index: [0u8; 100].to_vec(),
sample_file_blake3: None, sample_file_blake3: None,
end_reason: None,
}; };
let id = { let id = {
let mut db = db.lock(); let mut db = db.lock();

View File

@ -182,11 +182,13 @@ pub(crate) fn insert_recording(
insert into recording (composite_id, stream_id, open_id, run_offset, flags, insert into recording (composite_id, stream_id, open_id, run_offset, flags,
sample_file_bytes, start_time_90k, prev_media_duration_90k, sample_file_bytes, start_time_90k, prev_media_duration_90k,
prev_runs, wall_duration_90k, media_duration_delta_90k, prev_runs, wall_duration_90k, media_duration_delta_90k,
video_samples, video_sync_samples, video_sample_entry_id) video_samples, video_sync_samples, video_sample_entry_id,
end_reason)
values (:composite_id, :stream_id, :open_id, :run_offset, :flags, values (:composite_id, :stream_id, :open_id, :run_offset, :flags,
:sample_file_bytes, :start_time_90k, :prev_media_duration_90k, :sample_file_bytes, :start_time_90k, :prev_media_duration_90k,
:prev_runs, :wall_duration_90k, :media_duration_delta_90k, :prev_runs, :wall_duration_90k, :media_duration_delta_90k,
:video_samples, :video_sync_samples, :video_sample_entry_id) :video_samples, :video_sync_samples, :video_sample_entry_id,
:end_reason)
"#, "#,
) )
.with_context(|e| format!("can't prepare recording insert: {}", e))?; .with_context(|e| format!("can't prepare recording insert: {}", e))?;
@ -205,6 +207,7 @@ pub(crate) fn insert_recording(
":video_samples": r.video_samples, ":video_samples": r.video_samples,
":video_sync_samples": r.video_sync_samples, ":video_sync_samples": r.video_sync_samples,
":video_sample_entry_id": r.video_sample_entry_id, ":video_sample_entry_id": r.video_sample_entry_id,
":end_reason": r.end_reason.as_deref(),
}) })
.with_context(|e| { .with_context(|e| {
format!( format!(

View File

@ -164,6 +164,10 @@ create table recording (
video_sync_samples integer not null check (video_sync_samples > 0), video_sync_samples integer not null check (video_sync_samples > 0),
video_sample_entry_id integer references video_sample_entry (id), video_sample_entry_id integer references video_sample_entry (id),
-- The reason this run ended. Absent if there are more recordings in this
-- run or if this recording predates schema version 7.
end_reason text
check (composite_id >> 32 = stream_id) check (composite_id >> 32 = stream_id)
); );

View File

@ -178,6 +178,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
video_samples integer not null check (video_samples > 0), video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_sync_samples > 0), video_sync_samples integer not null check (video_sync_samples > 0),
video_sample_entry_id integer references video_sample_entry (id), video_sample_entry_id integer references video_sample_entry (id),
end_reason text
check (composite_id >> 32 = stream_id) check (composite_id >> 32 = stream_id)
); );
create index recording_cover on recording ( create index recording_cover on recording (
@ -193,7 +194,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
run_offset, run_offset,
flags flags
); );
insert into recording select * from old_recording; insert into recording select *, null from old_recording;
alter table recording_integrity rename to old_recording_integrity; alter table recording_integrity rename to old_recording_integrity;
create table recording_integrity ( create table recording_integrity (
composite_id integer primary key references recording (composite_id), composite_id integer primary key references recording (composite_id),

View File

@ -780,10 +780,10 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample. /// swallowing errors and using a zero duration for the last sample.
pub fn close(&mut self, next_pts: Option<i64>) -> Result<(), Error> { pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> {
self.state = match mem::replace(&mut self.state, WriterState::Unopened) { self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
WriterState::Open(w) => { WriterState::Open(w) => {
let prev = w.close(self.channel, next_pts, self.db, self.stream_id)?; let prev = w.close(self.channel, next_pts, self.db, self.stream_id, reason)?;
WriterState::Closed(prev) WriterState::Closed(prev)
} }
s => s, s => s,
@ -855,6 +855,7 @@ impl<F: FileWriter> InnerWriter<F> {
next_pts: Option<i64>, next_pts: Option<i64>,
db: &db::Database<C>, db: &db::Database<C>,
stream_id: i32, stream_id: i32,
reason: Option<String>,
) -> Result<PreviousWriter, Error> { ) -> Result<PreviousWriter, Error> {
let unindexed = self let unindexed = self
.unindexed_sample .unindexed_sample
@ -882,6 +883,7 @@ impl<F: FileWriter> InnerWriter<F> {
l.flags = flags; l.flags = flags;
l.local_time_delta = self.local_start - l.start; l.local_time_delta = self.local_start - l.start;
l.sample_file_blake3 = Some(*blake3.as_bytes()); l.sample_file_blake3 = Some(*blake3.as_bytes());
l.end_reason = reason;
wall_duration = recording::Duration(i64::from(l.wall_duration_90k)); wall_duration = recording::Duration(i64::from(l.wall_duration_90k));
run_offset = l.run_offset; run_offset = l.run_offset;
end = l.start + wall_duration; end = l.start + wall_duration;
@ -902,7 +904,13 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> {
// Swallow any error. The caller should only drop the Writer without calling close() // Swallow any error. The caller should only drop the Writer without calling close()
// if there's already been an error. The caller should report that. No point in // if there's already been an error. The caller should report that. No point in
// complaining again. // complaining again.
let _ = w.close(self.channel, None, self.db, self.stream_id); let _ = w.close(
self.channel,
None,
self.db,
self.stream_id,
Some("drop".to_owned()),
);
} }
} }
} }
@ -1209,7 +1217,7 @@ mod tests {
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap(); w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1)).unwrap(); w.close(Some(1), None).unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1); assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
@ -1418,7 +1426,7 @@ mod tests {
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap(); w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1)).unwrap(); w.close(Some(1), None).unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1); assert_eq!(h.syncer.planned_flushes.len(), 1);

View File

@ -2326,7 +2326,7 @@ mod tests {
.unwrap(); .unwrap();
end_pts = Some(pkt.pts + i64::from(pkt.duration)); end_pts = Some(pkt.pts + i64::from(pkt.duration));
} }
output.close(end_pts).unwrap(); output.close(end_pts, None).unwrap();
db.syncer_channel.flush(); db.syncer_channel.flush();
} }

View File

@ -153,7 +153,14 @@ where
while !self.shutdown.load(Ordering::SeqCst) { while !self.shutdown.load(Ordering::SeqCst) {
let pkt = { let pkt = {
let _t = TimerGuard::new(&clocks, || "getting next packet"); let _t = TimerGuard::new(&clocks, || "getting next packet");
stream.next()? stream.next()
};
let pkt = match pkt {
Ok(p) => p,
Err(e) => {
let _ = w.close(None, Some(e.to_string()));
return Err(e);
}
}; };
if !seen_key_frame && !pkt.is_key { if !seen_key_frame && !pkt.is_key {
continue; continue;
@ -167,7 +174,7 @@ where
if frame_realtime.sec > r && pkt.is_key { if frame_realtime.sec > r && pkt.is_key {
trace!("{}: write on normal rotation", self.short_name); trace!("{}: write on normal rotation", self.short_name);
let _t = TimerGuard::new(&clocks, || "closing writer"); let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(Some(pkt.pts))?; w.close(Some(pkt.pts), None)?;
None None
} else { } else {
Some(r) Some(r)
@ -205,7 +212,7 @@ where
} }
if rotate.is_some() { if rotate.is_some() {
let _t = TimerGuard::new(&clocks, || "closing writer"); let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(None)?; w.close(None, Some("NVR shutdown".to_owned()))?;
} }
Ok(()) Ok(())
} }