extend recording_playback with an open_id

As noted in schema.sql, this can be used for disambiguation. It also may be
useful in diagnosing data integrity problems.

Also, sneak in a couple minor improvements: better diagnostics in a couple
places, fix to 1->2 upgrade procedure.
This commit is contained in:
Scott Lamb 2018-02-22 21:46:41 -08:00
parent b037c9bdd7
commit bf45ae6011
7 changed files with 31 additions and 15 deletions

View File

@ -584,10 +584,11 @@ pub struct LockedDatabase {
to_delete: Vec<ListOldestSampleFilesRow>, to_delete: Vec<ListOldestSampleFilesRow>,
} }
/// Represents a row of the `open` database table.
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
struct Open { pub(crate) struct Open {
id: u32, pub(crate) id: u32,
uuid: Uuid, pub(crate) uuid: Uuid,
} }
/// A modification to be done to a `Stream`, used within `LockedDatabase::flush`. /// A modification to be done to a `Stream`, used within `LockedDatabase::flush`.
@ -802,6 +803,10 @@ impl LockedDatabase {
/// On success, for each affected sample file directory with a flush watcher set, sends a /// On success, for each affected sample file directory with a flush watcher set, sends a
/// `Flush` event. /// `Flush` event.
pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> { pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> {
let o = match self.open.as_ref() {
None => bail!("database is read-only"),
Some(o) => o,
};
let tx = self.conn.transaction()?; let tx = self.conn.transaction()?;
let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(),
Default::default()); Default::default());
@ -825,7 +830,7 @@ impl LockedDatabase {
if !l.synced { break; } if !l.synced { break; }
if let Some(ref r) = l.recording { if let Some(ref r) = l.recording {
raw::insert_recording( raw::insert_recording(
&tx, CompositeId::new(stream_id, s.next_recording_id + i), &r)?; &tx, o, CompositeId::new(stream_id, s.next_recording_id + i), &r)?;
} }
i += 1; i += 1;
} }

View File

@ -603,7 +603,9 @@ impl Syncer {
format!("{}-{}: unflushed={} >= if={}", format!("{}-{}: unflushed={} >= if={}",
c.short_name, s.type_.as_str(), unflushed, s.flush_if) c.short_name, s.type_.as_str(), unflushed, s.flush_if)
}; };
let _ = db.flush(&reason); if let Err(e) = db.flush(&reason) {
error!("flush failure on save: {:?}", e);
}
} }
} }

View File

@ -47,8 +47,8 @@ const INSERT_RECORDING_SQL: &'static str = r#"
"#; "#;
const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#"
insert into recording_playback (composite_id, sample_file_sha1, video_index) insert into recording_playback (composite_id, open_id, sample_file_sha1, video_index)
values (:composite_id, :sample_file_sha1, :video_index) values (:composite_id, :open_id, :sample_file_sha1, :video_index)
"#; "#;
const STREAM_MIN_START_SQL: &'static str = r#" const STREAM_MIN_START_SQL: &'static str = r#"
@ -73,7 +73,7 @@ const STREAM_MAX_START_SQL: &'static str = r#"
"#; "#;
/// Inserts the specified recording (for from `try_flush` only). /// Inserts the specified recording (for from `try_flush` only).
pub(crate) fn insert_recording(tx: &rusqlite::Transaction, id: CompositeId, pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId,
r: &db::RecordingToInsert) -> Result<(), Error> { r: &db::RecordingToInsert) -> Result<(), Error> {
if r.time.end < r.time.start { if r.time.end < r.time.start {
bail!("end time {} must be >= start time {}", r.time.end, r.time.start); bail!("end time {} must be >= start time {}", r.time.end, r.time.start);
@ -98,6 +98,7 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, id: CompositeId,
let sha1 = &r.sample_file_sha1[..]; let sha1 = &r.sample_file_sha1[..];
stmt.execute_named(&[ stmt.execute_named(&[
(":composite_id", &id.0), (":composite_id", &id.0),
(":open_id", &o.id),
(":sample_file_sha1", &sha1), (":sample_file_sha1", &sha1),
(":video_index", &r.video_index), (":video_index", &r.video_index),
])?; ])?;

View File

@ -202,6 +202,12 @@ create table recording_playback (
-- See description on recording table. -- See description on recording table.
composite_id integer primary key references recording (composite_id), composite_id integer primary key references recording (composite_id),
-- The open in which this was committed to the database. For a given
-- composite_id, only one recording will ever be committed to the database,
-- but in-memory state may reflect a recording which never gets committed.
-- This field allows disambiguation in etags and such.
open_id integer not null references open (id),
-- The sha1 hash of the contents of the sample file. -- The sha1 hash of the contents of the sample file.
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),

View File

@ -235,7 +235,7 @@ impl<'a> super::Upgrader for U<'a> {
-- Insert sub stream (if path is non-empty) using any id. -- Insert sub stream (if path is non-empty) using any id.
insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path, insert into stream (camera_id, sample_file_dir_id, type, record, rtsp_path,
retain_bytes, next_recording_id) retain_bytes, flush_if_sec, next_recording_id)
select select
old_camera.id, old_camera.id,
sample_file_dir.id, sample_file_dir.id,
@ -243,7 +243,7 @@ impl<'a> super::Upgrader for U<'a> {
0, 0,
old_camera.sub_rtsp_path, old_camera.sub_rtsp_path,
0, 0,
60, 90,
1 1
from from
old_camera cross join sample_file_dir old_camera cross join sample_file_dir

View File

@ -148,16 +148,18 @@ impl super::Upgrader for U {
alter table recording_playback rename to old_recording_playback; alter table recording_playback rename to old_recording_playback;
create table recording_playback ( create table recording_playback (
composite_id integer primary key references recording (composite_id), composite_id integer primary key references recording (composite_id),
open_id integer not null references open (id),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0) video_index blob not null check (length(video_index) > 0)
); );
insert into recording_playback insert into recording_playback
select select
composite_id, p.composite_id,
sample_file_sha1, o.id,
video_index p.sample_file_sha1,
p.video_index
from from
old_recording_playback; old_recording_playback p cross join open o;
drop table old_recording_playback; drop table old_recording_playback;
"#)?; "#)?;
Ok(()) Ok(())

View File

@ -136,7 +136,7 @@ fn main() {
h.clone().install().unwrap(); h.clone().install().unwrap();
if let Err(e) = { let _a = h.async(); args.arg_command.unwrap().run() } { if let Err(e) = { let _a = h.async(); args.arg_command.unwrap().run() } {
error!("{}", e); error!("{:?}", e);
::std::process::exit(1); ::std::process::exit(1);
} }
info!("Success."); info!("Success.");