diff --git a/server/db/check.rs b/server/db/check.rs index dc9208e..effdbc1 100644 --- a/server/db/check.rs +++ b/server/db/check.rs @@ -37,7 +37,7 @@ use crate::raw; use crate::recording; use failure::Error; use fnv::FnvHashMap; -use log::{info, error}; +use log::{info, error, warn}; use nix::fcntl::AtFlags; use rusqlite::params; use crate::schema; @@ -57,6 +57,7 @@ pub fn run(conn: &rusqlite::Connection, opts: &Options) -> Result { if let Some(diffs) = compare::get_diffs("actual", conn, "expected", &expected)? { error!("Schema is not as expected:\n{}", &diffs); printed_error = true; + warn!("The following analysis may be incorrect or encounter errors due to schema differences."); } else { println!("Schema is as expected."); } @@ -66,7 +67,7 @@ pub fn run(conn: &rusqlite::Connection, opts: &Options) -> Result { let db_uuid = raw::get_db_uuid(&conn)?; // Scan directories. - let mut streams_by_dir: FnvHashMap = FnvHashMap::default(); + let mut dirs_by_id: FnvHashMap = FnvHashMap::default(); { let mut dir_stmt = conn.prepare(r#" select d.id, d.path, d.uuid, d.last_complete_open_id, o.uuid @@ -98,33 +99,43 @@ pub fn run(conn: &rusqlite::Connection, opts: &Options) -> Result { while let Some(row) = rows.next()? { let id = CompositeId(row.get(0)?); let s = streams.entry(id.stream()).or_insert_with(Stream::default); - s.entry(id.recording()).or_insert_with(Recording::default).garbage_row = true; + s.recordings.entry(id.recording()).or_insert_with(Recording::default).garbage_row = + true; } - streams_by_dir.insert(dir_id, streams); + dirs_by_id.insert(dir_id, streams); } } // Scan known streams. { let mut stmt = conn.prepare(r#" - select id, sample_file_dir_id from stream where sample_file_dir_id is not null + select + id, + sample_file_dir_id, + cum_recordings + from + stream + where + sample_file_dir_id is not null "#)?; let mut rows = stmt.query(params![])?; while let Some(row) = rows.next()? { let stream_id = row.get(0)?; let dir_id = row.get(1)?; - let stream = match streams_by_dir.get_mut(&dir_id) { + let cum_recordings = row.get(2)?; + let mut stream = match dirs_by_id.get_mut(&dir_id) { None => Stream::default(), Some(d) => d.remove(&stream_id).unwrap_or_else(Stream::default), }; + stream.cum_recordings = Some(cum_recordings); printed_error |= compare_stream(conn, stream_id, opts, stream)?; } } // Expect the rest to have only garbage. - for (&dir_id, streams) in &streams_by_dir { + for (&dir_id, streams) in &dirs_by_id { for (&stream_id, stream) in streams { - for (&recording_id, r) in stream { + for (&recording_id, r) in &stream.recordings { let id = CompositeId::new(stream_id, recording_id); if r.recording_row.is_some() || r.playback_row.is_some() || r.integrity_row || !r.garbage_row { @@ -166,7 +177,12 @@ struct Recording { garbage_row: bool, } -type Stream = FnvHashMap; +#[derive(Default)] +struct Stream { + recordings: FnvHashMap, + cum_recordings: Option, +} + type Dir = FnvHashMap; fn summarize_index(video_index: &[u8]) -> Result { @@ -215,7 +231,7 @@ fn read_dir(d: &dir::SampleFileDir, opts: &Options) -> Result { nix::sys::stat::fstatat(fd, f, AtFlags::empty())?.st_size as u64 } else { 0 }; let stream = dir.entry(id.stream()).or_insert_with(Stream::default); - stream.entry(id.recording()).or_insert_with(Recording::default).file = Some(len); + stream.recordings.entry(id.recording()).or_insert_with(Recording::default).file = Some(len); } Ok(dir) } @@ -226,6 +242,7 @@ fn compare_stream(conn: &rusqlite::Connection, stream_id: i32, opts: &Options, let start = CompositeId::new(stream_id, 0); let end = CompositeId::new(stream_id, i32::max_value()); let mut printed_error = false; + let cum_recordings = stream.cum_recordings.expect("cum_recordings must be set on known stream"); // recording row. { @@ -252,7 +269,7 @@ fn compare_stream(conn: &rusqlite::Connection, stream_id: i32, opts: &Options, video_samples: row.get(4)?, video_sync_samples: row.get(5)?, }; - stream.entry(id.recording()) + stream.recordings.entry(id.recording()) .or_insert_with(Recording::default) .recording_row = Some(s); } @@ -281,7 +298,7 @@ fn compare_stream(conn: &rusqlite::Connection, stream_id: i32, opts: &Options, continue; }, }; - stream.entry(id.recording()) + stream.recordings.entry(id.recording()) .or_insert_with(Recording::default) .playback_row = Some(s); } @@ -300,21 +317,36 @@ fn compare_stream(conn: &rusqlite::Connection, stream_id: i32, opts: &Options, let mut rows = stmt.query(params![start.0, end.0])?; while let Some(row) = rows.next()? { let id = CompositeId(row.get(0)?); - stream.entry(id.recording()) + stream.recordings.entry(id.recording()) .or_insert_with(Recording::default) .integrity_row = true; } } - for (&id, recording) in &stream { + for (&id, recording) in &stream.recordings { let id = CompositeId::new(stream_id, id); + + // Files should have recording and playback rows if they aren't marked + // as garbage (deletion in progress) and aren't newer than + // cum_recordings (were being written when the process died). + let db_rows_expected = !recording.garbage_row && id.recording() < cum_recordings; + let r = match recording.recording_row { - Some(ref r) => r, + Some(ref r) => { + if !db_rows_expected { + error!("Unexpected recording row for {}: {:#?}", id, recording); + printed_error = true; + continue; + } + r + }, None => { - if !recording.garbage_row || recording.playback_row.is_some() || - recording.integrity_row { + if db_rows_expected { error!("Missing recording row for {}: {:#?}", id, recording); printed_error = true; + } else if recording.playback_row.is_some() { + error!("Unexpected playback row for {}: {:#?}", id, recording); + printed_error = true; } continue; },