diff --git a/guide/schema.md b/guide/schema.md index d56520e..daf82d2 100644 --- a/guide/schema.md +++ b/guide/schema.md @@ -192,17 +192,28 @@ dropped from 605 to 39. The general upgrade procedure applies to this upgrade. -### Version 1 to version 2 +### Version 1 to version 2 to version 3 -Version 2 adds: +This upgrade affects the sample file directory as well as the database. Thus, +the restore procedure written above of simply copying back the databae is +insufficient. To do a full restore, you would need to back up and restore the +sample file directory as well. This directory is considerably larger, so +consider an alternate procedure of crossing your fingers, and being prepared +to start over from scratch if there's a problem. + +Version 2 represents a half-finished upgrade from version 1 to version 3; it +is never used. + +Version 3 adds over version 1: * recording of sub streams (splits a new `stream` table out of `camera`) * support for multiple sample file directories, to take advantage of multiple hard drives (or multiple RAID volumes). -* interlock between database and sample file directories to avoid various +* an interlock between database and sample file directories to avoid various mixups that could cause data integrity problems. -* records the RFC-6381 codec associated with a video sample entry, so that +* recording the RFC-6381 codec associated with a video sample entry, so that logic for determining this is no longer needed as part of the database layer. - -The general upgrade procedure applies to this upgrade. +* a simpler sample file directory layout in which files are represented by + the same sequentially increasing id as in the database, rather than a + separate uuid which has to be reserved in advance. diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index 6d99e68..879b4dc 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -209,9 +209,8 @@ fn lower_retention(db: &Arc, zero_limits: BTreeMap Result<(), Error> { let dirs_to_open: Vec<_> = zero_limits.keys().map(|id| *id).collect(); db.lock().open_sample_file_dirs(&dirs_to_open[..])?; - for (dir_id, l) in &zero_limits { - let dir = db.lock().sample_file_dirs_by_id().get(dir_id).unwrap().get()?; - dir::lower_retention(dir.clone(), db.clone(), &l)?; + for (&dir_id, l) in &zero_limits { + dir::lower_retention(db.clone(), dir_id, &l)?; } Ok(()) } diff --git a/src/cmds/config/dirs.rs b/src/cmds/config/dirs.rs index c4289cc..f4f0318 100644 --- a/src/cmds/config/dirs.rs +++ b/src/cmds/config/dirs.rs @@ -144,12 +144,11 @@ fn actually_delete(model: &RefCell, siv: &mut Cursive) { .collect(); siv.pop_layer(); // deletion confirmation siv.pop_layer(); // retention dialog - let dir = { + { let mut l = model.db.lock(); l.open_sample_file_dirs(&[model.dir_id]).unwrap(); // TODO: don't unwrap. - l.sample_file_dirs_by_id().get(&model.dir_id).unwrap().get().unwrap() - }; - if let Err(e) = dir::lower_retention(dir, model.db.clone(), &new_limits[..]) { + } + if let Err(e) = dir::lower_retention(model.db.clone(), model.dir_id, &new_limits[..]) { siv.add_layer(views::Dialog::text(format!("Unable to delete excess video: {}", e)) .title("Error") .dismiss_button("Abort")); diff --git a/src/cmds/mod.rs b/src/cmds/mod.rs index 71556e4..0035efd 100644 --- a/src/cmds/mod.rs +++ b/src/cmds/mod.rs @@ -75,7 +75,7 @@ enum OpenMode { /// Locks and opens the database. /// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is. fn open_conn(db_dir: &str, mode: OpenMode) -> Result<(dir::Fd, rusqlite::Connection), Error> { - let dir = dir::Fd::open(db_dir, mode == OpenMode::Create)?; + let dir = dir::Fd::open(None, db_dir, mode == OpenMode::Create)?; let ro = mode == OpenMode::ReadOnly; dir.lock(if ro { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB) .map_err(|e| Error{description: format!("db dir {:?} already in use; can't get {} lock", diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 4c92918..67f3d78 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -143,7 +143,7 @@ pub fn run() -> Result<(), Error> { drop(l); let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default()); for (id, dir) in dirs.drain() { - let (channel, join) = dir::start_syncer(dir.clone(), db.clone())?; + let (channel, join) = dir::start_syncer(db.clone(), id)?; syncers.insert(id, Syncer { dir, channel, diff --git a/src/cmds/upgrade/mod.rs b/src/cmds/upgrade/mod.rs index 40a1504..97a00eb 100644 --- a/src/cmds/upgrade/mod.rs +++ b/src/cmds/upgrade/mod.rs @@ -38,6 +38,7 @@ use rusqlite; mod v0_to_v1; mod v1_to_v2; +mod v2_to_v3; const USAGE: &'static str = r#" Upgrade to the latest database schema. @@ -92,6 +93,7 @@ pub fn run() -> Result<(), Error> { let upgraders = [ v0_to_v1::new, v1_to_v2::new, + v2_to_v3::new, ]; { diff --git a/src/cmds/upgrade/v1_to_v2.rs b/src/cmds/upgrade/v1_to_v2.rs index af2cef0..cf87c64 100644 --- a/src/cmds/upgrade/v1_to_v2.rs +++ b/src/cmds/upgrade/v1_to_v2.rs @@ -1,5 +1,5 @@ // This file is part of Moonfire NVR, a security camera digital video recorder. -// Copyright (C) 2016 Scott Lamb +// Copyright (C) 2018 Scott Lamb // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -135,6 +135,7 @@ impl<'a> super::Upgrader for U<'a> { self.dir_meta = Some(meta); tx.execute_batch(r#" + drop table reserved_sample_files; alter table camera rename to old_camera; alter table recording rename to old_recording; alter table video_sample_entry rename to old_video_sample_entry; @@ -199,6 +200,12 @@ impl<'a> super::Upgrader for U<'a> { data blob not null check (length(data) > 86) ); + create table garbage ( + sample_file_dir_id integer references sample_file_dir (id), + composite_id integer, + primary key (sample_file_dir_id, composite_id) + ) without rowid; + insert into camera select id, diff --git a/src/cmds/upgrade/v2_to_v3.rs b/src/cmds/upgrade/v2_to_v3.rs new file mode 100644 index 0000000..6947fdc --- /dev/null +++ b/src/cmds/upgrade/v2_to_v3.rs @@ -0,0 +1,166 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2018 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +/// Upgrades a version 2 schema to a version 3 schema. + +use db::{self, FromSqlUuid}; +use dir; +use error::Error; +use libc; +use std::io::{self, Write}; +use std::mem; +//use std::rc::Rc; +use rusqlite; +use uuid::Uuid; + +pub struct U; + +pub fn new<'a>(_args: &'a super::Args) -> Result, Error> { + Ok(Box::new(U)) +} + +/*fn build_stream_to_dir(dir: &dir::Fd, tx: &rusqlite::Transaction) + -> Result>>, Error> { + let mut v = Vec::new(); + let max_id: u32 = tx.query_row("select max(id) from stream", &[], |r| r.get_checked(0))??; + v.resize((max_id + 1) as usize, None); + let mut stmt = tx.prepare(r#" + select + stream.id, + camera.uuid, + stream.type + from + camera join stream on (camera.id = stream.camera_id) + "#)?; + let mut rows = stmt.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let id: i32 = row.get_checked(0)?; + let uuid: FromSqlUuid = row.get_checked(1)?; + let type_: String = row.get_checked(2)?; + v[id as usize] = + Some(Rc::new(dir::Fd::open(Some(dir), &format!("{}-{}", uuid.0, type_), true)?)); + } + Ok(v) +}*/ + +/// Gets a pathname for a sample file suitable for passing to open or unlink. +fn get_uuid_pathname(uuid: Uuid) -> [libc::c_char; 37] { + let mut buf = [0u8; 37]; + write!(&mut buf[..36], "{}", uuid.hyphenated()).expect("can't format uuid to pathname buf"); + + // libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64). + unsafe { mem::transmute::<[u8; 37], [libc::c_char; 37]>(buf) } +} + +fn get_id_pathname(id: db::CompositeId) -> [libc::c_char; 17] { + let mut buf = [0u8; 17]; + write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf"); + unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) } +} + +impl super::Upgrader for U { + fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> { + /*let (meta, path) = tx.query_row(r#" + select + meta.uuid, + dir.path, + dir.uuid, + dir.last_complete_open_id, + open.uuid + from + meta cross join sample_file_dir dir + join open on (dir.last_complete_open_id = open.id) + "#, |row| -> Result<_, Error> { + let mut meta = DirMeta::new(); + let db_uuid: FromSqlUuid = row.get_checked(0)?; + let path: String = row.get_checked(1)?; + let dir_uuid: FromSqlUuid = row.get_checked(2)?; + let open_uuid: FromSqlUuid = row.get_checked(4)?; + meta.db_uuid.extend_from_slice(&db_uuid.0.as_bytes()[..]); + meta.dir_uuid.extend_from_slice(&dir_uuid.0.as_bytes()[..]); + let open = meta.mut_last_complete_open(); + open.id = row.get_checked(3)?; + open.uuid.extend_from_slice(&open_uuid.0.as_bytes()[..]); + Ok((meta, path)) + })??;*/ + + let path: String = tx.query_row(r#" + select path from sample_file_dir + "#, &[], |row| { row.get_checked(0) })??; + + // Build map of stream -> dirname. + let d = dir::Fd::open(None, &path, false)?; + //let stream_to_dir = build_stream_to_dir(&d, tx)?; + + let mut stmt = tx.prepare(r#" + select + composite_id, + sample_file_uuid + from + recording_playback + "#)?; + let mut rows = stmt.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let id = db::CompositeId(row.get_checked(0)?); + let sample_file_uuid: FromSqlUuid = row.get_checked(1)?; + let from_path = get_uuid_pathname(sample_file_uuid.0); + let to_path = get_id_pathname(id); + //let to_dir: &dir::Fd = stream_to_dir[stream_id as usize].as_ref().unwrap(); + let r = unsafe { dir::renameat(&d, from_path.as_ptr(), &d, to_path.as_ptr()) }; + if let Err(e) = r { + if e.kind() == io::ErrorKind::NotFound { + continue; // assume it was already moved. + } + Err(e)?; + } + } + + // These create statements match the schema.sql when version 3 was the latest. + tx.execute_batch(r#" + alter table recording_playback rename to old_recording_playback; + create table recording_playback ( + composite_id integer primary key references recording (composite_id), + sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), + video_index blob not null check (length(video_index) > 0) + ); + insert into recording_playback + select + composite_id, + sample_file_sha1, + video_index + from + old_recording_playback; + drop table old_recording_playback; + "#)?; + Ok(()) + } +} diff --git a/src/db.rs b/src/db.rs index 2cffb70..31767f4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -75,11 +75,10 @@ use time; use uuid::Uuid; /// Expected schema version. See `guide/schema.md` for more information. -pub const EXPECTED_VERSION: i32 = 2; +pub const EXPECTED_VERSION: i32 = 3; const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" select - sample_file_uuid, video_index from recording_playback @@ -87,24 +86,12 @@ const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" composite_id = :composite_id "#; -const DELETE_RESERVATION_SQL: &'static str = - "delete from reserved_sample_files where uuid = :uuid"; +const DELETE_GARBAGE_SQL: &'static str = + "delete from garbage where composite_id = :composite_id"; -const INSERT_RESERVATION_SQL: &'static str = r#" - insert into reserved_sample_files (uuid, state) - values (:uuid, :state) -"#; - -/// Valid values for the `state` column in the `reserved_sample_files` table. -enum ReservationState { - /// This uuid has not yet been added to the `recording` table. The file may be unwritten, - /// partially written, or fully written. - Writing = 0, - - /// This uuid was previously in the `recording` table. The file may be fully written or - /// unlinked. - Deleting = 1, -} +const INSERT_GARBAGE_SQL: &'static str = + "insert into garbage (sample_file_dir_id, composite_id) + values (:sample_file_dir_id, :composite_id)"; const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" insert into video_sample_entry (sha1, width, height, rfc6381_codec, data) @@ -121,9 +108,8 @@ const INSERT_RECORDING_SQL: &'static str = r#" "#; const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" - insert into recording_playback (composite_id, sample_file_uuid, sample_file_sha1, video_index) - values (:composite_id, :sample_file_uuid, :sample_file_sha1, - :video_index) + insert into recording_playback (composite_id, sample_file_sha1, video_index) + values (:composite_id, :sample_file_sha1, :video_index) "#; const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = @@ -131,19 +117,17 @@ const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" select - recording.composite_id, - recording_playback.sample_file_uuid, - recording.start_time_90k, - recording.duration_90k, - recording.sample_file_bytes + composite_id, + start_time_90k, + duration_90k, + sample_file_bytes from recording - join recording_playback on (recording.composite_id = recording_playback.composite_id) where - :start <= recording.composite_id and - recording.composite_id < :end + :start <= composite_id and + composite_id < :end order by - recording.composite_id + composite_id "#; const DELETE_RECORDING_SQL: &'static str = r#" @@ -205,18 +189,16 @@ impl rusqlite::types::FromSql for FromSqlUuid { } } -/// A box with space for the uuid (initially uninitialized) and the video index. -/// The caller must fill the uuid bytes. -struct PlaybackData(Box<[u8]>); +struct VideoIndex(Box<[u8]>); -impl rusqlite::types::FromSql for PlaybackData { +impl rusqlite::types::FromSql for VideoIndex { fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult { let blob = value.as_blob()?; - let len = 16 + blob.len(); + let len = blob.len(); let mut v = Vec::with_capacity(len); unsafe { v.set_len(len) }; - v[16..].copy_from_slice(blob); - Ok(PlaybackData(v.into_boxed_slice())) + v.copy_from_slice(blob); + Ok(VideoIndex(v.into_boxed_slice())) } } @@ -238,8 +220,7 @@ pub struct ListRecordingsRow { pub start: recording::Time, pub video_sample_entry: Arc, - pub stream_id: i32, - pub id: i32, + pub id: CompositeId, /// This is a recording::Duration, but a single recording's duration fits into an i32. pub duration_90k: i32, @@ -267,19 +248,9 @@ pub struct ListAggregatedRecordingsRow { /// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`. #[derive(Debug)] pub struct RecordingPlayback<'a> { - pub sample_file_uuid: Uuid, pub video_index: &'a [u8], } -impl<'a> RecordingPlayback<'a> { - fn new(data: &'a [u8]) -> Self { - RecordingPlayback { - sample_file_uuid: Uuid::from_bytes(&data[..16]).unwrap(), - video_index: &data[16..], - } - } -} - /// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`. pub enum RecordingFlags { TrailingZero = 1, @@ -288,7 +259,7 @@ pub enum RecordingFlags { /// A recording to pass to `insert_recording`. #[derive(Debug)] pub struct RecordingToInsert { - pub stream_id: i32, + pub id: CompositeId, pub run_offset: i32, pub flags: i32, pub sample_file_bytes: i32, @@ -297,7 +268,6 @@ pub struct RecordingToInsert { pub video_samples: i32, pub video_sync_samples: i32, pub video_sample_entry_id: i32, - pub sample_file_uuid: Uuid, pub video_index: Vec, pub sample_file_sha1: [u8; 20], } @@ -305,9 +275,7 @@ pub struct RecordingToInsert { /// A row used in `list_oldest_sample_files`. #[derive(Debug)] pub struct ListOldestSampleFilesRow { - pub uuid: Uuid, - pub stream_id: i32, - pub recording_id: i32, + pub id: CompositeId, pub time: Range, pub sample_file_bytes: i32, } @@ -446,7 +414,7 @@ pub struct Stream { /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. pub days: BTreeMap, pub record: bool, - next_recording_id: i32, + pub next_recording_id: i32, } #[derive(Debug, Default)] @@ -611,7 +579,7 @@ struct State { cameras_by_uuid: BTreeMap, video_sample_entries: BTreeMap>, list_recordings_by_time_sql: String, - playback_cache: RefCell, fnv::FnvBuildHasher>>, + video_index_cache: RefCell, fnv::FnvBuildHasher>>, } #[derive(Copy, Clone, Debug)] @@ -632,11 +600,6 @@ pub struct Transaction<'a> { /// well. We could use savepoints (nested transactions) for this, but for simplicity we just /// require the entire transaction be rolled back. must_rollback: bool, - - /// Normally sample file uuids must be reserved prior to a recording being inserted. - /// It's convenient in benchmarks though to allow the same segment to be inserted into the - /// database many times, so this safety check can be disabled. - pub bypass_reservation_for_testing: bool, } /// A modification to be done to a `Stream` after a `Transaction` is committed. @@ -666,53 +629,56 @@ struct StreamModification { new_record: Option, } -fn composite_id(stream_id: i32, recording_id: i32) -> i64 { - (stream_id as i64) << 32 | recording_id as i64 +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct CompositeId(pub i64); + +impl CompositeId { + pub fn new(stream_id: i32, recording_id: i32) -> Self { + CompositeId((stream_id as i64) << 32 | recording_id as i64) + } + + pub fn stream(self) -> i32 { (self.0 >> 32) as i32 } + pub fn recording(self) -> i32 { self.0 as i32 } +} + +impl ::std::fmt::Display for CompositeId { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> { + write!(f, "{}/{}", self.stream(), self.recording()) + } } impl<'a> Transaction<'a> { - /// Reserves a new, randomly generated UUID to be used as a sample file. - pub fn reserve_sample_file(&mut self) -> Result { - let mut stmt = self.tx.prepare_cached(INSERT_RESERVATION_SQL)?; - let uuid = Uuid::new_v4(); - let uuid_bytes = &uuid.as_bytes()[..]; - stmt.execute_named(&[ - (":uuid", &uuid_bytes), - (":state", &(ReservationState::Writing as i64)) - ])?; - info!("reserved {}", uuid); - Ok(uuid) - } - /// Deletes the given recordings from the `recording` and `recording_playback` tables. /// Note they are not fully removed from the database; the uuids are transferred to the - /// `reserved_sample_files` table. The caller should `unlink` the files, then remove the - /// reservation. + /// `garbage` table. The caller should `unlink` the files, then remove the `garbage` row. pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> { let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?; let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?; - let mut insert = self.tx.prepare_cached(INSERT_RESERVATION_SQL)?; + let mut insert = self.tx.prepare_cached(INSERT_GARBAGE_SQL)?; self.check_must_rollback()?; self.must_rollback = true; for row in rows { - let composite_id = &composite_id(row.stream_id, row.recording_id); - let changes = del1.execute_named(&[(":composite_id", composite_id)])?; + let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?; if changes != 1 { - return Err(Error::new(format!("no such recording {}/{} (uuid {})", - row.stream_id, row.recording_id, row.uuid))); + return Err(Error::new(format!("no such recording {}", row.id))); } - let changes = del2.execute_named(&[(":composite_id", composite_id)])?; + let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?; if changes != 1 { - return Err(Error::new(format!("no such recording_playback {}/{} (uuid {})", - row.stream_id, row.recording_id, row.uuid))); + return Err(Error::new(format!("no such recording_playback {}", row.id))); } - let uuid = &row.uuid.as_bytes()[..]; + let sid = row.id.stream(); + let did = self.state + .streams_by_id + .get(&sid) + .ok_or_else(|| Error::new(format!("no such stream {}", sid)))? + .sample_file_dir_id + .ok_or_else(|| Error::new(format!("stream {} has no dir", sid)))?; insert.execute_named(&[ - (":uuid", &uuid), - (":state", &(ReservationState::Deleting as i64)) - ])?; - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.stream_id); + (":sample_file_dir_id", &did), + (":composite_id", &row.id.0)], + )?; + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.id.stream()); m.duration -= row.time.end - row.time.start; m.sample_file_bytes -= row.sample_file_bytes as i64; adjust_days(row.time.clone(), -1, &mut m.days); @@ -721,59 +687,46 @@ impl<'a> Transaction<'a> { Ok(()) } - /// Marks the given sample file uuid as deleted. Accepts uuids in either `ReservationState`. - /// This shouldn't be called until the files have been `unlink()`ed and the parent directory - /// `fsync()`ed. - pub fn mark_sample_files_deleted(&mut self, uuids: &[Uuid]) -> Result<(), Error> { - if uuids.is_empty() { return Ok(()); } - let mut stmt = - self.tx.prepare_cached("delete from reserved_sample_files where uuid = :uuid;")?; - for uuid in uuids { - let uuid_bytes = &uuid.as_bytes()[..]; - let changes = stmt.execute_named(&[(":uuid", &uuid_bytes)])?; + /// Marks the given sample files as deleted. This shouldn't be called until the files have + /// been `unlink()`ed and the parent directory `fsync()`ed. + pub fn mark_sample_files_deleted(&mut self, ids: &[CompositeId]) -> Result<(), Error> { + if ids.is_empty() { return Ok(()); } + let mut stmt = self.tx.prepare_cached(DELETE_GARBAGE_SQL)?; + for &id in ids { + let changes = stmt.execute_named(&[(":composite_id", &id.0)])?; if changes != 1 { - return Err(Error::new(format!("no reservation for {}", uuid.hyphenated()))); + return Err(Error::new(format!("no garbage row for {}", id))); } } Ok(()) } /// Inserts the specified recording. - /// The sample file uuid must have been previously reserved. (Although this can be bypassed - /// for testing; see the `bypass_reservation_for_testing` field.) - pub fn insert_recording(&mut self, r: &RecordingToInsert) -> Result { + pub fn insert_recording(&mut self, r: &RecordingToInsert) -> Result<(), Error> { self.check_must_rollback()?; - // Sanity checking. if r.time.end < r.time.start { return Err(Error::new(format!("end time {} must be >= start time {}", r.time.end, r.time.start))); } - // Unreserve the sample file uuid and insert the recording row. - // TODO: var used? - let stream = match self.state.streams_by_id.get_mut(&r.stream_id) { - None => return Err(Error::new(format!("no such stream id {}", r.stream_id))), + // Check that the recording id is acceptable and do the insertion. + let stream = match self.state.streams_by_id.get(&r.id.stream()) { + None => return Err(Error::new(format!("no such stream id {}", r.id.stream()))), Some(s) => s, }; - let uuid = &r.sample_file_uuid.as_bytes()[..]; - { - let mut stmt = self.tx.prepare_cached(DELETE_RESERVATION_SQL)?; - let changes = stmt.execute_named(&[(":uuid", &uuid)])?; - if changes != 1 && !self.bypass_reservation_for_testing { - return Err(Error::new(format!("uuid {} is not reserved", r.sample_file_uuid))); - } - } self.must_rollback = true; - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.stream_id); - let recording_id; + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.id.stream()); { - recording_id = m.new_next_recording_id.unwrap_or(stream.next_recording_id); - let composite_id = composite_id(r.stream_id, recording_id); + let next = m.new_next_recording_id.unwrap_or(stream.next_recording_id); + if r.id.recording() < next { + return Err(Error::new(format!("recording {} out of order; next id is {}!", + r.id, next))); + } let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?; stmt.execute_named(&[ - (":composite_id", &composite_id), - (":stream_id", &(r.stream_id as i64)), + (":composite_id", &r.id.0), + (":stream_id", &(r.id.stream() as i64)), (":run_offset", &r.run_offset), (":flags", &r.flags), (":sample_file_bytes", &r.sample_file_bytes), @@ -784,18 +737,17 @@ impl<'a> Transaction<'a> { (":video_sync_samples", &r.video_sync_samples), (":video_sample_entry_id", &r.video_sample_entry_id), ])?; - m.new_next_recording_id = Some(recording_id + 1); + m.new_next_recording_id = Some(r.id.recording() + 1); let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; let sha1 = &r.sample_file_sha1[..]; stmt.execute_named(&[ - (":composite_id", &composite_id), - (":sample_file_uuid", &uuid), + (":composite_id", &r.id.0), (":sample_file_sha1", &sha1), (":video_index", &r.video_index), ])?; let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; stmt.execute_named(&[ - (":stream_id", &(r.stream_id as i64)), + (":stream_id", &(r.id.stream() as i64)), (":next_recording_id", &m.new_next_recording_id), ])?; } @@ -803,7 +755,7 @@ impl<'a> Transaction<'a> { m.duration += r.time.end - r.time.start; m.sample_file_bytes += r.sample_file_bytes as i64; adjust_days(r.time.clone(), 1, &mut m.days); - Ok(recording_id) + Ok(()) } /// Updates the `record` and `retain_bytes` for the given stream. @@ -1090,6 +1042,11 @@ impl LockedDatabase { let mut meta = schema::DirMeta::default(); meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); meta.dir_uuid.extend_from_slice(&dir.uuid.as_bytes()[..]); + if let Some(o) = dir.last_complete_open { + let open = meta.mut_last_complete_open(); + open.id = o.id; + open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]); + } if let Some(o) = o { let open = meta.mut_in_progress_open(); open.id = o.id; @@ -1144,12 +1101,11 @@ impl LockedDatabase { /// database directory, and the connection is locked within the process, so having a /// `LockedDatabase` is sufficient to ensure a consistent view. pub fn tx(&mut self) -> Result { - Ok(Transaction{ + Ok(Transaction { state: &mut self.state, mods_by_stream: FnvHashMap::default(), tx: self.conn.transaction()?, must_rollback: false, - bypass_reservation_for_testing: false, }) } @@ -1171,7 +1127,7 @@ impl LockedDatabase { (":stream_id", &stream_id), (":start_time_90k", &desired_time.start.0), (":end_time_90k", &desired_time.end.0)])?; - self.list_recordings_inner(stream_id, rows, f) + self.list_recordings_inner(rows, f) } /// Lists the specified recordigs in ascending order by id. @@ -1180,29 +1136,26 @@ impl LockedDatabase { where F: FnMut(ListRecordingsRow) -> Result<(), Error> { let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; let rows = stmt.query_named(&[ - (":start", &composite_id(stream_id, desired_ids.start)), - (":end", &composite_id(stream_id, desired_ids.end)), + (":start", &CompositeId::new(stream_id, desired_ids.start).0), + (":end", &CompositeId::new(stream_id, desired_ids.end).0), ])?; - self.list_recordings_inner(stream_id, rows, f) + self.list_recordings_inner(rows, f) } - fn list_recordings_inner(&self, stream_id: i32, mut rows: rusqlite::Rows, mut f: F) - -> Result<(), Error> + fn list_recordings_inner(&self, mut rows: rusqlite::Rows, mut f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { while let Some(row) = rows.next() { let row = row?; - let id = row.get_checked::<_, i64>(0)? as i32; // drop top bits of composite_id. + let id = CompositeId(row.get_checked::<_, i64>(0)?); let vse_id = row.get_checked(8)?; let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) { Some(v) => v, None => { return Err(Error::new(format!( - "recording {}/{} references nonexistent video_sample_entry {}", - stream_id, id, vse_id))); + "recording {} references nonexistent video_sample_entry {}", id, vse_id))); }, }; - let out = ListRecordingsRow{ - stream_id, + let out = ListRecordingsRow { id, run_offset: row.get_checked(1)?, flags: row.get_checked(2)?, @@ -1241,11 +1194,12 @@ impl LockedDatabase { // causing problems.) let mut aggs: BTreeMap = BTreeMap::new(); self.list_recordings_by_time(stream_id, desired_time, |row| { - let run_start_id = row.id - row.run_offset; + let recording_id = row.id.recording(); + let run_start_id = recording_id - row.run_offset; let needs_flush = if let Some(a) = aggs.get(&run_start_id) { let new_dur = a.time.end - a.time.start + recording::Duration(row.duration_90k as i64); - a.ids.end != row.id || row.video_sample_entry.id != a.video_sample_entry.id || + a.ids.end != recording_id || row.video_sample_entry.id != a.video_sample_entry.id || new_dur >= forced_split } else { false @@ -1261,7 +1215,7 @@ impl LockedDatabase { stream_id, a.ids.end - 1, a.time.end, row.id, row.start))); } a.time.end.0 += row.duration_90k as i64; - a.ids.end = row.id + 1; + a.ids.end = recording_id + 1; a.video_samples += row.video_samples as i64; a.video_sync_samples += row.video_sync_samples as i64; a.sample_file_bytes += row.sample_file_bytes as i64; @@ -1272,13 +1226,13 @@ impl LockedDatabase { if need_insert { aggs.insert(run_start_id, ListAggregatedRecordingsRow{ time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64), - ids: row.id .. row.id+1, + ids: recording_id .. recording_id+1, video_samples: row.video_samples as i64, video_sync_samples: row.video_sync_samples as i64, sample_file_bytes: row.sample_file_bytes as i64, video_sample_entry: row.video_sample_entry, stream_id, - run_start_id: row.id - row.run_offset, + run_start_id, flags: row.flags, }); }; @@ -1293,44 +1247,37 @@ impl LockedDatabase { /// Calls `f` with a single `recording_playback` row. /// Note the lock is held for the duration of `f`. /// This uses a LRU cache to reduce the number of retrievals from the database. - pub fn with_recording_playback(&self, stream_id: i32, recording_id: i32, f: F) - -> Result + pub fn with_recording_playback(&self, id: CompositeId, f: F) -> Result where F: FnOnce(&RecordingPlayback) -> Result { - let composite_id = composite_id(stream_id, recording_id); - let mut cache = self.state.playback_cache.borrow_mut(); - if let Some(r) = cache.get_mut(&composite_id) { - trace!("cache hit for recording {}/{}", stream_id, recording_id); - return f(&RecordingPlayback::new(r)); + let mut cache = self.state.video_index_cache.borrow_mut(); + if let Some(video_index) = cache.get_mut(&id.0) { + trace!("cache hit for recording {}", id); + return f(&RecordingPlayback { video_index }); } - trace!("cache miss for recording {}/{}", stream_id, recording_id); + trace!("cache miss for recording {}", id); let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?; - let mut rows = stmt.query_named(&[(":composite_id", &composite_id)])?; + let mut rows = stmt.query_named(&[(":composite_id", &id.0)])?; if let Some(row) = rows.next() { let row = row?; - let uuid: FromSqlUuid = row.get_checked(0)?; - let data = { - let mut data: PlaybackData = row.get_checked(1)?; - data.0[0..16].copy_from_slice(uuid.0.as_bytes()); - data.0 - }; - let result = f(&RecordingPlayback::new(&data)); - cache.insert(composite_id, data); + let video_index: VideoIndex = row.get_checked(0)?; + let result = f(&RecordingPlayback { video_index: &video_index.0[..] }); + cache.insert(id.0, video_index.0); return result; } - Err(Error::new(format!("no such recording {}/{}", stream_id, recording_id))) + Err(Error::new(format!("no such recording {}", id))) } - /// Lists all reserved sample files. - pub fn list_reserved_sample_files(&self) -> Result, Error> { - let mut reserved = Vec::new(); - let mut stmt = self.conn.prepare_cached("select uuid from reserved_sample_files;")?; - let mut rows = stmt.query_named(&[])?; + /// Lists all garbage ids. + pub fn list_garbage(&self, dir_id: i32) -> Result, Error> { + let mut garbage = Vec::new(); + let mut stmt = self.conn.prepare_cached( + "select composite_id from garbage where sample_file_dir_id = ?")?; + let mut rows = stmt.query(&[&dir_id])?; while let Some(row) = rows.next() { let row = row?; - let uuid: FromSqlUuid = row.get_checked(0)?; - reserved.push(uuid.0); + garbage.push(CompositeId(row.get_checked(0)?)); } - Ok(reserved) + Ok(garbage) } /// Lists the oldest sample files (to delete to free room). @@ -1339,21 +1286,18 @@ impl LockedDatabase { where F: FnMut(ListOldestSampleFilesRow) -> bool { let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; let mut rows = stmt.query_named(&[ - (":start", &composite_id(stream_id, 0)), - (":end", &composite_id(stream_id + 1, 0)), + (":start", &CompositeId::new(stream_id, 0).0), + (":end", &CompositeId::new(stream_id + 1, 0).0), ])?; while let Some(row) = rows.next() { let row = row?; - let start = recording::Time(row.get_checked(2)?); - let duration = recording::Duration(row.get_checked(3)?); - let composite_id: i64 = row.get_checked(0)?; - let uuid: FromSqlUuid = row.get_checked(1)?; + let id = CompositeId(row.get_checked(0)?); + let start = recording::Time(row.get_checked(1)?); + let duration = recording::Duration(row.get_checked(2)?); let should_continue = f(ListOldestSampleFilesRow{ - recording_id: composite_id as i32, - stream_id: (composite_id >> 32) as i32, - uuid: uuid.0, + id, time: start .. start + duration, - sample_file_bytes: row.get_checked(4)?, + sample_file_bytes: row.get_checked(3)?, }); if !should_continue { break; @@ -1844,7 +1788,7 @@ impl Database { cameras_by_uuid: BTreeMap::new(), streams_by_id: BTreeMap::new(), video_sample_entries: BTreeMap::new(), - playback_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), + video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), list_recordings_by_time_sql: list_recordings_by_time_sql, }, })); @@ -1895,12 +1839,10 @@ impl Database { mod tests { extern crate tempdir; - use core::cmp::Ord; use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite::Connection; use std::collections::BTreeMap; use std::error::Error as E; - use std::fmt::Debug; use testutil; use super::*; use super::adjust_days; // non-public. @@ -1960,13 +1902,13 @@ mod tests { // TODO(slamb): test that the days logic works correctly. let mut rows = 0; - let mut recording_id = -1; + let mut recording_id = None; { let db = db.lock(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); db.list_recordings_by_time(stream_id, all_time, |row| { rows += 1; - recording_id = row.id; + recording_id = Some(row.id); assert_eq!(r.time, row.start .. row.start + recording::Duration(row.duration_90k as i64)); assert_eq!(r.video_samples, row.video_samples); @@ -1981,8 +1923,7 @@ mod tests { rows = 0; db.lock().list_oldest_sample_files(stream_id, |row| { rows += 1; - assert_eq!(recording_id, row.recording_id); - assert_eq!(r.sample_file_uuid, row.uuid); + assert_eq!(recording_id, Some(row.id)); assert_eq!(r.time, row.time); assert_eq!(r.sample_file_bytes, row.sample_file_bytes); true @@ -1993,13 +1934,6 @@ mod tests { // TODO: with_recording_playback. } - fn assert_unsorted_eq(mut a: Vec, mut b: Vec) - where T: Debug + Ord { - a.sort(); - b.sort(); - assert_eq!(a, b); - } - #[test] fn test_adjust_days() { testutil::init(); @@ -2076,10 +2010,10 @@ mod tests { fn test_version_too_old() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); let e = Database::new(c, false).unwrap_err(); assert!(e.description().starts_with( - "Database schema version 1 is too old (expected 2)"), "got: {:?}", + "Database schema version 2 is too old (expected 3)"), "got: {:?}", e.description()); } @@ -2087,10 +2021,10 @@ mod tests { fn test_version_too_new() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (3, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap(); let e = Database::new(c, false).unwrap_err(); assert!(e.description().starts_with( - "Database schema version 3 is too new (expected 2)"), "got: {:?}", e.description()); + "Database schema version 4 is too new (expected 3)"), "got: {:?}", e.description()); } /// Basic test of running some queries on a fresh database. @@ -2138,30 +2072,19 @@ mod tests { let db = Database::new(conn, true).unwrap(); assert_no_recordings(&db, camera_uuid); - assert_eq!(db.lock().list_reserved_sample_files().unwrap(), &[]); - - let (uuid_to_use, uuid_to_keep); - { - let mut db = db.lock(); - let mut tx = db.tx().unwrap(); - uuid_to_use = tx.reserve_sample_file().unwrap(); - uuid_to_keep = tx.reserve_sample_file().unwrap(); - tx.commit().unwrap(); - } - - assert_unsorted_eq(db.lock().list_reserved_sample_files().unwrap(), - vec![uuid_to_use, uuid_to_keep]); + assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), &[]); let vse_id = db.lock().insert_video_sample_entry( 1920, 1080, include_bytes!("testdata/avc1").to_vec(), "avc1.4d0029".to_owned()).unwrap(); assert!(vse_id > 0, "vse_id = {}", vse_id); - // Inserting a recording should succeed and remove its uuid from the reserved table. + // Inserting a recording should succeed and advance the next recording id. let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC); let stream_id = camera_id; // TODO + let id = CompositeId::new(stream_id, 1); let recording = RecordingToInsert { - stream_id, + id, sample_file_bytes: 42, run_offset: 0, flags: 0, @@ -2170,7 +2093,6 @@ mod tests { video_samples: 1, video_sync_samples: 1, video_sample_entry_id: vse_id, - sample_file_uuid: uuid_to_use, video_index: [0u8; 100].to_vec(), sample_file_sha1: [0u8; 20], }; @@ -2180,8 +2102,7 @@ mod tests { tx.insert_recording(&recording).unwrap(); tx.commit().unwrap(); } - assert_unsorted_eq(db.lock().list_reserved_sample_files().unwrap(), - vec![uuid_to_keep]); + assert_eq!(db.lock().streams_by_id().get(&stream_id).unwrap().next_recording_id, 2); // Queries should return the correct result (with caches update on insert). assert_single_recording(&db, stream_id, &recording); @@ -2192,7 +2113,7 @@ mod tests { let db = Database::new(conn, true).unwrap(); assert_single_recording(&db, stream_id, &recording); - // Deleting a recording should succeed, update the min/max times, and re-reserve the uuid. + // Deleting a recording should succeed, update the min/max times, and mark it as garbage. { let mut db = db.lock(); let mut v = Vec::new(); @@ -2203,32 +2124,31 @@ mod tests { tx.commit().unwrap(); } assert_no_recordings(&db, camera_uuid); - assert_unsorted_eq(db.lock().list_reserved_sample_files().unwrap(), - vec![uuid_to_use, uuid_to_keep]); + assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), vec![id]); } #[test] fn test_drop_tx() { testutil::init(); let conn = setup_conn(); + conn.execute("insert into garbage values (1, ?)", &[&CompositeId::new(1, 1).0]).unwrap(); let db = Database::new(conn, true).unwrap(); let mut db = db.lock(); { let mut tx = db.tx().unwrap(); - tx.reserve_sample_file().unwrap(); + tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap(); // drop tx without committing. } // The dropped tx should have done nothing. - assert_eq!(db.list_reserved_sample_files().unwrap(), &[]); + assert_eq!(db.list_garbage(1).unwrap(), &[CompositeId::new(1, 1)]); // Following transactions should succeed. - let uuid; { let mut tx = db.tx().unwrap(); - uuid = tx.reserve_sample_file().unwrap(); + tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap(); tx.commit().unwrap(); } - assert_eq!(db.list_reserved_sample_files().unwrap(), &[uuid]); + assert_eq!(db.list_garbage(1).unwrap(), &[]); } } diff --git a/src/dir.rs b/src/dir.rs index cb4f1af..8495f23 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -32,8 +32,9 @@ //! //! This includes opening files for serving, rotating away old files, and saving new files. -use db; +use db::{self, CompositeId}; use error::Error; +use fnv::FnvHashMap; use libc::{self, c_char}; use protobuf::{self, Message}; use recording; @@ -44,11 +45,11 @@ use std::ffi; use std::fs; use std::io::{self, Read, Write}; use std::mem; +use std::os::unix::ffi::OsStrExt; use std::os::unix::io::FromRawFd; use std::sync::{Arc, Mutex}; use std::sync::mpsc; use std::thread; -use uuid::Uuid; /// A sample file directory. Typically one per physical disk drive. /// @@ -81,16 +82,18 @@ impl Drop for Fd { impl Fd { /// Opens the given path as a directory. - pub fn open(path: &str, mkdir: bool) -> Result { + pub fn open(fd: Option<&Fd>, path: &str, mkdir: bool) -> Result { + let fd = fd.map(|fd| fd.0).unwrap_or(libc::AT_FDCWD); let cstring = ffi::CString::new(path) .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - if mkdir && unsafe { libc::mkdir(cstring.as_ptr(), 0o700) } != 0 { + if mkdir && unsafe { libc::mkdirat(fd, cstring.as_ptr(), 0o700) } != 0 { let e = io::Error::last_os_error(); if e.kind() != io::ErrorKind::AlreadyExists { return Err(e.into()); } } - let fd = unsafe { libc::open(cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, 0) }; + let fd = unsafe { libc::openat(fd, cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, + 0) }; if fd < 0 { return Err(io::Error::last_os_error().into()); } @@ -107,14 +110,6 @@ impl Fd { Ok(fs::File::from_raw_fd(fd)) } - unsafe fn renameat(&self, from: *const c_char, to: *const c_char) -> Result<(), io::Error> { - let result = libc::renameat(self.0, from, self.0, to); - if result < 0 { - return Err(io::Error::last_os_error()) - } - Ok(()) - } - /// Locks the directory with the specified `flock` operation. pub fn lock(&self, operation: libc::c_int) -> Result<(), io::Error> { let ret = unsafe { libc::flock(self.0, operation) }; @@ -135,6 +130,15 @@ impl Fd { } } +pub unsafe fn renameat(from_fd: &Fd, from_path: *const c_char, + to_fd: &Fd, to_path: *const c_char) -> Result<(), io::Error> { + let result = libc::renameat(from_fd.0, from_path, to_fd.0, to_path); + if result < 0 { + return Err(io::Error::last_os_error()) + } + Ok(()) +} + impl SampleFileDir { /// Opens the directory using the given metadata. /// @@ -147,7 +151,7 @@ impl SampleFileDir { s.fd.lock(if read_write { libc::LOCK_EX } else { libc::LOCK_SH } | libc::LOCK_NB)?; let dir_meta = s.read_meta()?; if !SampleFileDir::consistent(db_meta, &dir_meta) { - return Err(Error::new(format!("metadata mismatch. db: {:?} dir: {:?}", + return Err(Error::new(format!("metadata mismatch.\ndb: {:#?}\ndir: {:#?}", db_meta, &dir_meta))); } if db_meta.in_progress_open.is_some() { @@ -193,19 +197,19 @@ impl SampleFileDir { } fn open_self(path: &str, create: bool) -> Result, Error> { - let fd = Fd::open(path, create) + let fd = Fd::open(None, path, create) .map_err(|e| Error::new(format!("unable to open sample file dir {}: {}", path, e)))?; Ok(Arc::new(SampleFileDir { fd, mutable: Mutex::new(SharedMutableState{ - next_uuid: None, + next_id_by_stream: FnvHashMap::default(), }), })) } /// Opens the given sample file for reading. - pub fn open_sample_file(&self, uuid: Uuid) -> Result { - let p = SampleFileDir::get_rel_pathname(uuid); + pub fn open_sample_file(&self, composite_id: CompositeId) -> Result { + let p = SampleFileDir::get_rel_pathname(composite_id); unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } } @@ -246,7 +250,7 @@ impl SampleFileDir { cause: Some(Box::new(e)), })?; f.sync_all()?; - unsafe { self.fd.renameat(tmp_path.as_ptr(), final_path.as_ptr())? }; + unsafe { renameat(&self.fd, tmp_path.as_ptr(), &self.fd, final_path.as_ptr())? }; self.sync()?; Ok(()) } @@ -258,52 +262,60 @@ impl SampleFileDir { /// /// The new recording will continue from `prev` if specified; this should be as returned from /// a previous `close` call. - pub fn create_writer<'a>(&self, db: &db::Database, channel: &'a SyncerChannel, - prev: Option, camera_id: i32, + pub fn create_writer<'a>(&'a self, db: &db::Database, channel: &'a SyncerChannel, + prev: Option, stream_id: i32, video_sample_entry_id: i32) -> Result, Error> { - // Grab the next uuid. Typically one is cached—a sync has usually completed since the last - // writer was created, and syncs ensure `next_uuid` is filled while performing their - // transaction. But if not, perform an extra database transaction to reserve a new one. - let uuid = match self.mutable.lock().unwrap().next_uuid.take() { - Some(u) => u, - None => { - info!("Committing extra transaction because there's no cached uuid"); + // Grab the next id. The dir itself will typically have an id (possibly one ahead of what's + // stored in the database), but not on the first attempt for a stream. + use std::collections::hash_map::Entry; + let recording_id; + match self.mutable.lock().unwrap().next_id_by_stream.entry(stream_id) { + Entry::Occupied(mut e) => { + let v = e.get_mut(); + recording_id = *v; + *v += 1; + }, + Entry::Vacant(e) => { let mut l = db.lock(); - let mut tx = l.tx()?; - let u = tx.reserve_sample_file()?; - tx.commit()?; - u + recording_id = l.streams_by_id().get(&stream_id).unwrap().next_recording_id; + e.insert(recording_id + 1); }, }; - let p = SampleFileDir::get_rel_pathname(uuid); + let id = CompositeId::new(stream_id, recording_id); + let p = SampleFileDir::get_rel_pathname(id); + let f = match unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) } { Ok(f) => f, Err(e) => { - self.mutable.lock().unwrap().next_uuid = Some(uuid); + // Put the id back to try again later. + let mut l = self.mutable.lock().unwrap(); + let v = l.next_id_by_stream.get_mut(&stream_id).unwrap(); + assert_eq!(*v, recording_id + 1); + *v -= 1; return Err(e.into()); }, }; - Writer::open(f, uuid, prev, camera_id, video_sample_entry_id, channel) + Writer::open(f, id, prev, video_sample_entry_id, channel) } pub fn statfs(&self) -> Result { self.fd.statfs() } /// Gets a pathname for a sample file suitable for passing to open or unlink. - fn get_rel_pathname(uuid: Uuid) -> [libc::c_char; 37] { - let mut buf = [0u8; 37]; - write!(&mut buf[..36], "{}", uuid.hyphenated()).expect("can't format uuid to pathname buf"); + fn get_rel_pathname(id: CompositeId) -> [libc::c_char; 17] { + let mut buf = [0u8; 17]; + write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf"); // libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64). - unsafe { mem::transmute::<[u8; 37], [libc::c_char; 37]>(buf) } + unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) } } /// Unlinks the given sample file within this directory. - fn unlink(fd: &Fd, uuid: Uuid) -> Result<(), io::Error> { - let p = SampleFileDir::get_rel_pathname(uuid); + fn unlink(fd: &Fd, id: CompositeId) -> Result<(), io::Error> { + let p = SampleFileDir::get_rel_pathname(id); let res = unsafe { libc::unlinkat(fd.0, p.as_ptr(), 0) }; if res < 0 { return Err(io::Error::last_os_error()) @@ -316,7 +328,7 @@ impl SampleFileDir { let res = unsafe { libc::fsync(self.fd.0) }; if res < 0 { return Err(io::Error::last_os_error()) - }; + } Ok(()) } } @@ -324,13 +336,13 @@ impl SampleFileDir { /// State shared between users of the `SampleFileDirectory` struct and the syncer. #[derive(Debug)] struct SharedMutableState { - next_uuid: Option, + next_id_by_stream: FnvHashMap, } /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { AsyncSaveRecording(db::RecordingToInsert, fs::File), - AsyncAbandonRecording(Uuid), + AsyncAbandonRecording(CompositeId), #[cfg(test)] Flush(mpsc::SyncSender<()>), @@ -345,8 +357,18 @@ pub struct SyncerChannel(mpsc::Sender); struct Syncer { dir: Arc, db: Arc, - to_unlink: Vec, - to_mark_deleted: Vec, + + /// Files to be unlinked then immediately forgotten about. They are `>= next_recording_id` for + /// their stream, `next_recording_id` won't be advanced without a sync of the directory, and + /// extraneous files `>= next_recording_id` are unlinked on startup, so this should be + /// sufficient. + to_abandon: Vec, + + /// Files to be unlinked then removed from the garbage table. + to_unlink: Vec, + + /// Files to be removed from the garbage table. + to_mark_deleted: Vec, } /// Starts a syncer for the given sample file directory. @@ -360,19 +382,16 @@ struct Syncer { /// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and /// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be /// removed and then the handle joined to allow all recordings to be persisted. -pub fn start_syncer(dir: Arc, db: Arc) +pub fn start_syncer(db: Arc, dir_id: i32) -> Result<(SyncerChannel, thread::JoinHandle<()>), Error> { - let to_unlink = db.lock().list_reserved_sample_files()?; - let (snd, rcv) = mpsc::channel(); - let mut syncer = Syncer { - dir, - db, - to_unlink, - to_mark_deleted: Vec::new(), - }; + let db2 = db.clone(); + let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.initial_rotation()?; + let (snd, rcv) = mpsc::channel(); Ok((SyncerChannel(snd), - thread::Builder::new().name("syncer".into()).spawn(move || syncer.run(rcv)).unwrap())) + thread::Builder::new() + .name(format!("sync-{}", path)) + .spawn(move || syncer.run(rcv)).unwrap())) } pub struct NewLimit { @@ -383,15 +402,10 @@ pub struct NewLimit { /// Deletes recordings if necessary to fit within the given new `retain_bytes` limit. /// Note this doesn't change the limit in the database; it only deletes files. /// Pass a limit of 0 to delete all recordings associated with a camera. -pub fn lower_retention(dir: Arc, db: Arc, limits: &[NewLimit]) +pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) -> Result<(), Error> { - let to_unlink = db.lock().list_reserved_sample_files()?; - let mut syncer = Syncer { - dir, - db, - to_unlink, - to_mark_deleted: Vec::new(), - }; + let db2 = db.clone(); + let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.do_rotation(|db| { let mut to_delete = Vec::new(); for l in limits { @@ -441,8 +455,8 @@ impl SyncerChannel { self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap(); } - fn async_abandon_recording(&self, uuid: Uuid) { - self.0.send(SyncerCommand::AsyncAbandonRecording(uuid)).unwrap(); + fn async_abandon_recording(&self, id: CompositeId) { + self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap(); } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. @@ -455,6 +469,60 @@ impl SyncerChannel { } impl Syncer { + fn new(l: &db::LockedDatabase, db: Arc, dir_id: i32) + -> Result<(Self, String), Error> { + let d = l.sample_file_dirs_by_id() + .get(&dir_id) + .ok_or_else(|| Error::new(format!("no dir {}", dir_id)))?; + let dir = d.get()?; + let to_unlink = l.list_garbage(dir_id)?; + + // Get files to abandon. + // First, get a list of the streams in question. + let streams_to_next: FnvHashMap<_, _> = + l.streams_by_id() + .iter() + .filter_map(|(&k, v)| { + if v.sample_file_dir_id == Some(dir_id) { + Some((k, v.next_recording_id)) + } else { + None + } + }) + .collect(); + let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?; + + Ok((Syncer { + dir, + db, + to_abandon, + to_unlink, + to_mark_deleted: Vec::new(), + }, d.path.clone())) + } + + /// Lists files which should be "abandoned" (deleted without ever recording in the database) + /// on opening. + fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap) + -> Result, Error> { + let mut v = Vec::new(); + for e in ::std::fs::read_dir(path)? { + let e = e?; + let id = match parse_id(e.file_name().as_bytes()) { + Ok(i) => i, + Err(_) => continue, + }; + let next = match streams_to_next.get(&id.stream()) { + Some(n) => *n, + None => continue, // unknown stream. + }; + if id.recording() >= next { + v.push(id); + } + } + Ok(v) + } + fn run(&mut self, cmds: mpsc::Receiver) { loop { match cmds.recv() { @@ -467,7 +535,7 @@ impl Syncer { } } - /// Rotates files for all streams and deletes stale reserved uuids from previous runs. + /// Rotates files for all streams and deletes stale files from previous runs. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { let mut to_delete = Vec::new(); @@ -489,7 +557,7 @@ impl Syncer { to_delete }; for row in to_delete { - self.to_unlink.push(row.uuid); + self.to_unlink.push(row.id); } self.try_unlink(); if !self.to_unlink.is_empty() { @@ -512,15 +580,14 @@ impl Syncer { /// so that there can be only one dir sync and database transaction per save. fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) { if let Err(e) = self.save_helper(&recording, f) { - error!("camera {}: will discard recording {} due to error while saving: {}", - recording.stream_id, recording.sample_file_uuid, e); - self.to_unlink.push(recording.sample_file_uuid); + error!("will discard recording {} due to error while saving: {}", recording.id, e); + self.abandon(recording.id); return; } } - fn abandon(&mut self, uuid: Uuid) { - self.to_unlink.push(uuid); + fn abandon(&mut self, id: CompositeId) { + self.to_abandon.push(id); self.try_unlink(); } @@ -532,55 +599,56 @@ impl Syncer { if !self.to_unlink.is_empty() { return Err(Error::new(format!("failed to unlink {} files.", self.to_unlink.len()))); } + + // XXX: if these calls fail, any other writes are likely to fail as well. f.sync_all()?; self.dir.sync()?; let mut to_delete = Vec::new(); - let mut l = self.dir.mutable.lock().unwrap(); let mut db = self.db.lock(); - let mut new_next_uuid = l.next_uuid; { + let stream_id = recording.id.stream(); let stream = - db.streams_by_id().get(&recording.stream_id) - .ok_or_else(|| Error::new(format!("no such stream {}", recording.stream_id)))?; - get_rows_to_delete(&db, recording.stream_id, stream, + db.streams_by_id().get(&stream_id) + .ok_or_else(|| Error::new(format!("no such stream {}", stream_id)))?; + get_rows_to_delete(&db, stream_id, stream, recording.sample_file_bytes as i64, &mut to_delete)?; } let mut tx = db.tx()?; tx.mark_sample_files_deleted(&self.to_mark_deleted)?; tx.delete_recordings(&to_delete)?; - if new_next_uuid.is_none() { - new_next_uuid = Some(tx.reserve_sample_file()?); - } tx.insert_recording(recording)?; tx.commit()?; - l.next_uuid = new_next_uuid; self.to_mark_deleted.clear(); - self.to_unlink.extend(to_delete.iter().map(|row| row.uuid)); + self.to_unlink.extend(to_delete.iter().map(|row| row.id)); + self.to_unlink.extend_from_slice(&self.to_abandon); + self.to_abandon.clear(); Ok(()) } - /// Tries to unlink all the uuids in `self.to_unlink`. Any which can't be unlinked will - /// be retained in the vec. + /// Tries to unlink all the files in `self.to_unlink` and `self.to_abandon`. + /// Any which can't be unlinked will be retained in the vec. fn try_unlink(&mut self) { let to_mark_deleted = &mut self.to_mark_deleted; let fd = &self.dir.fd; - self.to_unlink.retain(|uuid| { - if let Err(e) = SampleFileDir::unlink(fd, *uuid) { - if e.kind() == io::ErrorKind::NotFound { - warn!("dir: Sample file {} already deleted!", uuid.hyphenated()); - to_mark_deleted.push(*uuid); - false - } else { - warn!("dir: Unable to unlink {}: {}", uuid.hyphenated(), e); - true + for &mut (ref mut v, mark_deleted) in &mut [(&mut self.to_unlink, true), + (&mut self.to_abandon, false)] { + v.retain(|&id| { + if let Err(e) = SampleFileDir::unlink(fd, id) { + if e.kind() == io::ErrorKind::NotFound { + warn!("dir: recording {} already deleted!", id); + } else { + warn!("dir: Unable to unlink {}: {}", id, e); + return true; + } + } + if mark_deleted { + to_mark_deleted.push(id); } - } else { - to_mark_deleted.push(*uuid); false - } - }); + }); + } } } @@ -598,7 +666,7 @@ struct InnerWriter<'a> { syncer_channel: &'a SyncerChannel, f: fs::File, index: recording::SampleIndexEncoder, - uuid: Uuid, + id: CompositeId, corrupt: bool, hasher: hash::Hasher, @@ -614,7 +682,6 @@ struct InnerWriter<'a> { adjuster: ClockAdjuster, - stream_id: i32, video_sample_entry_id: i32, run_offset: i32, @@ -691,19 +758,18 @@ pub struct PreviousWriter { impl<'a> Writer<'a> { /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, uuid: Uuid, prev: Option, stream_id: i32, + fn open(f: fs::File, id: CompositeId, prev: Option, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { - Ok(Writer(Some(InnerWriter{ + Ok(Writer(Some(InnerWriter { syncer_channel, f, index: recording::SampleIndexEncoder::new(), - uuid, + id, corrupt: false, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - stream_id, video_sample_entry_id, run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, @@ -734,7 +800,7 @@ impl<'a> Writer<'a> { // Partially written packet. Truncate if possible. if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) { error!("After write to {} failed with {}, truncate failed with {}; \ - sample file is corrupt.", w.uuid.hyphenated(), e, e2); + sample file is corrupt.", w.id, e, e2); w.corrupt = true; } } @@ -768,8 +834,8 @@ impl<'a> InnerWriter<'a> { fn close(mut self, next_pts: Option) -> Result { if self.corrupt { - self.syncer_channel.async_abandon_recording(self.uuid); - return Err(Error::new(format!("recording {} is corrupt", self.uuid))); + self.syncer_channel.async_abandon_recording(self.id); + return Err(Error::new(format!("recording {} is corrupt", self.id))); } let unflushed = self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?; @@ -787,14 +853,13 @@ impl<'a> InnerWriter<'a> { else { 0 }; let local_start_delta = self.local_start - start; let recording = db::RecordingToInsert{ - stream_id: self.stream_id, + id: self.id, sample_file_bytes: self.index.sample_file_bytes, time: start .. end, local_time_delta: local_start_delta, video_samples: self.index.video_samples, video_sync_samples: self.index.video_sync_samples, video_sample_entry_id: self.video_sample_entry_id, - sample_file_uuid: self.uuid, video_index: self.index.video_index, sample_file_sha1: sha1_bytes, run_offset: self.run_offset, @@ -820,6 +885,24 @@ impl<'a> Drop for Writer<'a> { } } +/// Parse a composite id filename. +/// +/// These are exactly 16 bytes, lowercase hex. +fn parse_id(id: &[u8]) -> Result { + if id.len() != 16 { + return Err(()); + } + let mut v: u64 = 0; + for i in 0..16 { + v = (v << 4) | match id[i] { + b @ b'0'...b'9' => b - b'0', + b @ b'a'...b'f' => b - b'a' + 10, + _ => return Err(()), + } as u64; + } + Ok(CompositeId(v as i64)) +} + #[cfg(test)] mod tests { use super::ClockAdjuster; @@ -883,4 +966,15 @@ mod tests { assert!(total == expected || total == expected + 1, "total={} vs expected={}", total, expected); } + + #[test] + fn parse_id() { + use super::parse_id; + assert_eq!(parse_id(b"0000000000000000").unwrap().0, 0); + assert_eq!(parse_id(b"0000000100000002").unwrap().0, 0x0000000100000002); + parse_id(b"").unwrap_err(); + parse_id(b"meta").unwrap_err(); + parse_id(b"0").unwrap_err(); + parse_id(b"000000010000000x").unwrap_err(); + } } diff --git a/src/mp4.rs b/src/mp4.rs index 7babde0..0351dae 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -381,8 +381,7 @@ impl Segment { self.index_once.call_once(|| { let index = unsafe { &mut *self.index.get() }; *index = db.lock() - .with_recording_playback(self.s.stream_id, self.s.recording_id, - |playback| self.build_index(playback)) + .with_recording_playback(self.s.id, |playback| self.build_index(playback)) .map_err(|e| { error!("Unable to build index for segment: {:?}", e); }); }); let index: &'a _ = unsafe { &*self.index.get() }; @@ -629,8 +628,7 @@ impl Slice { } let truns = mp4.0.db.lock() - .with_recording_playback(s.s.stream_id, s.s.recording_id, - |playback| s.truns(playback, pos, len)) + .with_recording_playback(s.s.id, |playback| s.truns(playback, pos, len)) .map_err(|e| { Error::new(format!("Unable to build index for segment: {:?}", e)) })?; let truns = ARefs::new(truns); Ok(truns.map(|t| &t[r.start as usize .. r.end as usize])) @@ -761,8 +759,8 @@ impl FileBuilder { if let Some(prev) = self.segments.last() { if prev.s.have_trailing_zero() { return Err(Error::new(format!( - "unable to append recording {}/{} after recording {}/{} with trailing zero", - row.stream_id, row.id, prev.s.stream_id, prev.s.recording_id))); + "unable to append recording {} after recording {} with trailing zero", + row.id, prev.s.id))); } } let s = Segment::new(db, &row, rel_range_90k, self.next_frame_num)?; @@ -812,8 +810,7 @@ impl FileBuilder { // Update the etag to reflect this segment. let mut data = [0_u8; 24]; let mut cursor = io::Cursor::new(&mut data[..]); - cursor.write_i32::(s.s.stream_id)?; - cursor.write_i32::(s.s.recording_id)?; + cursor.write_i64::(s.s.id.0)?; cursor.write_i64::(s.s.start.0)?; cursor.write_i32::(d.start)?; cursor.write_i32::(d.end)?; @@ -1452,16 +1449,10 @@ impl FileInner { /// happen because nothing should be touching Moonfire NVR's files but itself. fn get_video_sample_data(&self, i: usize, r: Range) -> Result { let s = &self.segments[i]; - let uuid = { - let l = self.db.lock(); - l.with_recording_playback(s.s.stream_id, s.s.recording_id, - |p| Ok(p.sample_file_uuid))? - }; let f = self.dirs_by_stream_id - .get(&s.s.stream_id) - .ok_or_else(|| Error::new(format!("{}/{}: stream not found", - s.s.stream_id, s.s.recording_id)))? - .open_sample_file(uuid)?; + .get(&s.s.id.stream()) + .ok_or_else(|| Error::new(format!("{}: stream not found", s.s.id)))? + .open_sample_file(s.s.id)?; let start = s.s.sample_file_range().start + r.start; let mmap = Box::new(unsafe { memmap::MmapOptions::new() @@ -2271,7 +2262,7 @@ mod bench { let rel_range_90k = 0 .. row.duration_90k; super::Segment::new(&db, &row, rel_range_90k, 1).unwrap() }; - db.with_recording_playback(segment.s.stream_id, segment.s.recording_id, |playback| { + db.with_recording_playback(segment.s.id, |playback| { let v = segment.build_index(playback).unwrap(); // warm. b.bytes = v.len() as u64; // define the benchmark performance in terms of output bytes. b.iter(|| segment.build_index(playback).unwrap()); diff --git a/src/recording.rs b/src/recording.rs index 800170b..a714ffe 100644 --- a/src/recording.rs +++ b/src/recording.rs @@ -354,8 +354,7 @@ impl SampleIndexEncoder { /// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4. #[derive(Debug)] pub struct Segment { - pub stream_id: i32, - pub recording_id: i32, + pub id: db::CompositeId, pub start: Time, /// An iterator positioned at the beginning of the segment, or `None`. Most segments are @@ -382,8 +381,7 @@ impl Segment { recording: &db::ListRecordingsRow, desired_range_90k: Range) -> Result { let mut self_ = Segment { - stream_id: recording.stream_id, - recording_id: recording.id, + id: recording.id, start: recording.start, begin: None, file_end: recording.sample_file_bytes, @@ -413,7 +411,7 @@ impl Segment { // Slow path. Need to iterate through the index. trace!("recording::Segment::new slow path, desired_range_90k={:?}, recording={:#?}", self_.desired_range_90k, recording); - db.with_recording_playback(self_.stream_id, self_.recording_id, |playback| { + db.with_recording_playback(self_.id, |playback| { let mut begin = Box::new(SampleIndexIterator::new()); let data = &(&playback).video_index; let mut it = SampleIndexIterator::new(); @@ -480,8 +478,8 @@ impl Segment { /// Must be called without the database lock held; retrieves video index from the cache. pub fn foreach(&self, playback: &db::RecordingPlayback, mut f: F) -> Result<(), Error> where F: FnMut(&SampleIndexIterator) -> Result<(), Error> { - trace!("foreach on recording {}/{}: {} frames, actual_start_90k: {}", - self.stream_id, self.recording_id, self.frames, self.actual_start_90k()); + trace!("foreach on recording {}: {} frames, actual_start_90k: {}", + self.id, self.frames, self.actual_start_90k()); let data = &(&playback).video_index; let mut it = match self.begin { Some(ref b) => **b, @@ -489,28 +487,26 @@ impl Segment { }; if it.uninitialized() { if !it.next(data)? { - return Err(Error::new(format!("recording {}/{}: no frames", - self.stream_id, self.recording_id))); + return Err(Error::new(format!("recording {}: no frames", self.id))); } if !it.is_key() { - return Err(Error::new(format!("recording {}/{}: doesn't start with key frame", - self.stream_id, self.recording_id))); + return Err(Error::new(format!("recording {}: doesn't start with key frame", + self.id))); } } let mut have_frame = true; let mut key_frame = 0; for i in 0 .. self.frames { if !have_frame { - return Err(Error::new(format!("recording {}/{}: expected {} frames, found only {}", - self.stream_id, self.recording_id, self.frames, - i+1))); + return Err(Error::new(format!("recording {}: expected {} frames, found only {}", + self.id, self.frames, i+1))); } if it.is_key() { key_frame += 1; if key_frame > self.key_frames { return Err(Error::new(format!( - "recording {}/{}: more than expected {} key frames", - self.stream_id, self.recording_id, self.key_frames))); + "recording {}: more than expected {} key frames", + self.id, self.key_frames))); } } @@ -521,9 +517,8 @@ impl Segment { have_frame = try!(it.next(data)); } if key_frame < self.key_frames { - return Err(Error::new(format!("recording {}/{}: expected {} key frames, found only {}", - self.stream_id, self.recording_id, self.key_frames, - key_frame))); + return Err(Error::new(format!("recording {}: expected {} key frames, found only {}", + self.id, self.key_frames, key_frame))); } Ok(()) } @@ -656,7 +651,7 @@ mod tests { fn get_frames(db: &db::Database, segment: &Segment, f: F) -> Vec where F: Fn(&SampleIndexIterator) -> T { let mut v = Vec::new(); - db.lock().with_recording_playback(segment.stream_id, segment.recording_id, |playback| { + db.lock().with_recording_playback(segment.id, |playback| { segment.foreach(playback, |it| { v.push(f(it)); Ok(()) }) }).unwrap(); v diff --git a/src/schema.sql b/src/schema.sql index 0350eff..a8c87ea 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -197,10 +197,6 @@ create table recording_playback ( -- See description on recording table. composite_id integer primary key references recording (composite_id), - -- The binary representation of the sample file's uuid. The canonical text - -- representation of this uuid is the filename within the sample file dir. - sample_file_uuid blob not null check (length(sample_file_uuid) = 16), - -- The sha1 hash of the contents of the sample file. sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), @@ -208,12 +204,21 @@ create table recording_playback ( video_index blob not null check (length(video_index) > 0) ); --- Files in the sample file directory which may be present but should simply be --- discarded on startup. (Recordings which were never completed or have been --- marked for completion.) -create table reserved_sample_files ( - uuid blob primary key check (length(uuid) = 16), - state integer not null -- 0 (writing) or 1 (deleted) +-- Files which are to be deleted (may or may not still exist). +-- Note that besides these files, for each stream, any recordings >= its +-- next_recording_id should be discarded on startup. +create table garbage ( + -- This is _mostly_ redundant with composite_id, which contains the stream + -- id and thus a linkage to the sample file directory. Listing it here + -- explicitly means that streams can be deleted without losing the + -- association of garbage to directory. + sample_file_dir_id integer not null references sample_file_dir (id), + + -- See description on recording table. + composite_id integer not null, + + -- Organize the table first by directory, as that's how it will be queried. + primary key (sample_file_dir_id, composite_id) ) without rowid; -- A concrete box derived from a ISO/IEC 14496-12 section 8.5.2 @@ -238,4 +243,4 @@ create table video_sample_entry ( ); insert into version (id, unix_time, notes) - values (2, cast(strftime('%s', 'now') as int), 'db creation'); + values (3, cast(strftime('%s', 'now') as int), 'db creation'); diff --git a/src/streamer.rs b/src/streamer.rs index a1f38d9..9cf4d79 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -201,7 +201,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { #[cfg(test)] mod tests { use clock::{self, Clocks}; - use db; + use db::{self, CompositeId}; use error::Error; use h264; use moonfire_ffmpeg; @@ -314,8 +314,8 @@ mod tests { is_key: bool, } - fn get_frames(db: &db::LockedDatabase, camera_id: i32, recording_id: i32) -> Vec { - db.with_recording_playback(camera_id, recording_id, |rec| { + fn get_frames(db: &db::LockedDatabase, id: CompositeId) -> Vec { + db.with_recording_playback(id, |rec| { let mut it = recording::SampleIndexIterator::new(); let mut frames = Vec::new(); while it.next(&rec.video_index).unwrap() { @@ -371,7 +371,7 @@ mod tests { // 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later: // * the first rotation is always skipped // * the second rotation is deferred until a key frame. - assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 1), &[ + assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 1)), &[ Frame{start_90k: 0, duration_90k: 90379, is_key: true}, Frame{start_90k: 90379, duration_90k: 89884, is_key: false}, Frame{start_90k: 180263, duration_90k: 89749, is_key: false}, @@ -381,7 +381,7 @@ mod tests { Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001... Frame{start_90k: 630036, duration_90k: 89958, is_key: false}, ]); - assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 2), &[ + assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 2)), &[ Frame{start_90k: 0, duration_90k: 90011, is_key: true}, Frame{start_90k: 90011, duration_90k: 0, is_key: false}, ]); @@ -391,10 +391,10 @@ mod tests { Ok(()) }).unwrap(); assert_eq!(2, recordings.len()); - assert_eq!(1, recordings[0].id); + assert_eq!(1, recordings[0].id.recording()); assert_eq!(recording::Time(128700575999999), recordings[0].start); assert_eq!(0, recordings[0].flags); - assert_eq!(2, recordings[1].id); + assert_eq!(2, recordings[1].id.recording()); assert_eq!(recording::Time(128700576719993), recordings[1].start); assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags); } diff --git a/src/testutil.rs b/src/testutil.rs index 1673165..2c54ca5 100644 --- a/src/testutil.rs +++ b/src/testutil.rs @@ -112,7 +112,8 @@ impl TestDb { } let mut dirs_by_stream_id = FnvHashMap::default(); dirs_by_stream_id.insert(TEST_STREAM_ID, dir.clone()); - let (syncer_channel, syncer_join) = dir::start_syncer(dir, db.clone()).unwrap(); + let (syncer_channel, syncer_join) = + dir::start_syncer(db.clone(), sample_file_dir_id).unwrap(); TestDb { db, dirs_by_stream_id: Arc::new(dirs_by_stream_id), @@ -128,13 +129,12 @@ impl TestDb { let mut db = self.db.lock(); let video_sample_entry_id = db.insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - let row_id; + let next = db.streams_by_id().get(&TEST_STREAM_ID).unwrap().next_recording_id; { let mut tx = db.tx().unwrap(); - tx.bypass_reservation_for_testing = true; const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - row_id = tx.insert_recording(&db::RecordingToInsert{ - stream_id: TEST_STREAM_ID, + tx.insert_recording(&db::RecordingToInsert { + id: db::CompositeId::new(TEST_STREAM_ID, next), sample_file_bytes: encoder.sample_file_bytes, time: START_TIME .. START_TIME + recording::Duration(encoder.total_duration_90k as i64), @@ -142,16 +142,15 @@ impl TestDb { video_samples: encoder.video_samples, video_sync_samples: encoder.video_sync_samples, video_sample_entry_id: video_sample_entry_id, - sample_file_uuid: Uuid::nil(), video_index: encoder.video_index, sample_file_sha1: [0u8; 20], - run_offset: 0, // TODO - flags: 0, // TODO + run_offset: 0, + flags: db::RecordingFlags::TrailingZero as i32, }).unwrap(); tx.commit().unwrap(); } let mut row = None; - db.list_recordings_by_id(TEST_STREAM_ID, row_id .. row_id + 1, + db.list_recordings_by_id(TEST_STREAM_ID, next .. next+1, |r| { row = Some(r); Ok(()) }).unwrap(); row.unwrap() } @@ -167,8 +166,8 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); const DURATION: recording::Duration = recording::Duration(5399985); - let mut recording = db::RecordingToInsert{ - stream_id: TEST_STREAM_ID, + let mut recording = db::RecordingToInsert { + id: db::CompositeId::new(TEST_STREAM_ID, 1), sample_file_bytes: 30104460, flags: 0, time: START_TIME .. (START_TIME + DURATION), @@ -176,15 +175,14 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { video_samples: 1800, video_sync_samples: 60, video_sample_entry_id: video_sample_entry_id, - sample_file_uuid: Uuid::nil(), video_index: data, sample_file_sha1: [0; 20], run_offset: 0, }; let mut tx = db.tx().unwrap(); - tx.bypass_reservation_for_testing = true; - for _ in 0..num { + for i in 0..num { tx.insert_recording(&recording).unwrap(); + recording.id = db::CompositeId::new(TEST_STREAM_ID, 2 + i as i32); recording.time.start += DURATION; recording.time.end += DURATION; recording.run_offset += 1; diff --git a/src/web.rs b/src/web.rs index f6ff503..076406f 100644 --- a/src/web.rs +++ b/src/web.rs @@ -328,18 +328,20 @@ impl ServiceInner { let mut prev = None; let mut cur_off = 0; db.list_recordings_by_id(stream_id, s.ids.clone(), |r| { + let recording_id = r.id.recording(); + // Check for missing recordings. match prev { - None if r.id == s.ids.start => {}, + None if recording_id == s.ids.start => {}, None => return Err(Error::new(format!("no such recording {}/{}", stream_id, s.ids.start))), - Some(id) if r.id != id + 1 => { + Some(id) if r.id.recording() != id + 1 => { return Err(Error::new(format!("no such recording {}/{}", stream_id, id + 1))); }, _ => {}, }; - prev = Some(r.id); + prev = Some(recording_id); // Add a segment for the relevant part of the recording, if any. let end_time = s.end_time.unwrap_or(i64::max_value()); @@ -348,11 +350,11 @@ impl ServiceInner { let start = cmp::max(0, s.start_time - cur_off); let end = cmp::min(d, end_time - cur_off); let times = start as i32 .. end as i32; - debug!("...appending recording {}/{} with times {:?} \ - (out of dur {})", r.stream_id, r.id, times, d); + debug!("...appending recording {} with times {:?} \ + (out of dur {})", r.id, times, d); builder.append(&db, r, start as i32 .. end as i32)?; } else { - debug!("...skipping recording {}/{} dur {}", r.stream_id, r.id, d); + debug!("...skipping recording {} dur {}", r.id, d); } cur_off += d; Ok(())