reorganize the sample file directory

The filenames now represent composite ids (stream id + recording id) rather
than a separate uuid system with its own reservation for a few benefits:

  * This provides more information when there are inconsistencies.

  * This avoids the need for managing the reservations during recording. I
    expect this to simplify delaying flushing of newly written sample files.
    Now the directory has to be scanned at startup for files that never got
    written to the database, but that's acceptably fast even with millions of
    files.

  * Less information to keep in memory and in the recording_playback table.

I'd considered using one directory per stream, which might help if the
filesystem has trouble coping with huge directories. But that would mean each
dir has to be fsync()ed separately (more latency and/or more multithreading).
So I'll stick with this until I see concrete evidence of a problem that would
solve.

Test coverage of the error conditions is poor. I plan to do some restructuring
of the db/dir code, hopefully making steps toward testability along the way.
This commit is contained in:
Scott Lamb 2018-02-20 10:11:10 -08:00
parent e7f5733f29
commit 253f3de399
16 changed files with 618 additions and 429 deletions

View File

@ -192,17 +192,28 @@ dropped from 605 to 39.
The general upgrade procedure applies to this upgrade. The general upgrade procedure applies to this upgrade.
### Version 1 to version 2 ### Version 1 to version 2 to version 3
Version 2 adds: This upgrade affects the sample file directory as well as the database. Thus,
the restore procedure written above of simply copying back the databae is
insufficient. To do a full restore, you would need to back up and restore the
sample file directory as well. This directory is considerably larger, so
consider an alternate procedure of crossing your fingers, and being prepared
to start over from scratch if there's a problem.
Version 2 represents a half-finished upgrade from version 1 to version 3; it
is never used.
Version 3 adds over version 1:
* recording of sub streams (splits a new `stream` table out of `camera`) * recording of sub streams (splits a new `stream` table out of `camera`)
* support for multiple sample file directories, to take advantage of * support for multiple sample file directories, to take advantage of
multiple hard drives (or multiple RAID volumes). multiple hard drives (or multiple RAID volumes).
* interlock between database and sample file directories to avoid various * an interlock between database and sample file directories to avoid various
mixups that could cause data integrity problems. mixups that could cause data integrity problems.
* records the RFC-6381 codec associated with a video sample entry, so that * recording the RFC-6381 codec associated with a video sample entry, so that
logic for determining this is no longer needed as part of the database logic for determining this is no longer needed as part of the database
layer. layer.
* a simpler sample file directory layout in which files are represented by
The general upgrade procedure applies to this upgrade. the same sequentially increasing id as in the database, rather than a
separate uuid which has to be reserved in advance.

View File

@ -209,9 +209,8 @@ fn lower_retention(db: &Arc<db::Database>, zero_limits: BTreeMap<i32, Vec<dir::N
-> Result<(), Error> { -> Result<(), Error> {
let dirs_to_open: Vec<_> = zero_limits.keys().map(|id| *id).collect(); let dirs_to_open: Vec<_> = zero_limits.keys().map(|id| *id).collect();
db.lock().open_sample_file_dirs(&dirs_to_open[..])?; db.lock().open_sample_file_dirs(&dirs_to_open[..])?;
for (dir_id, l) in &zero_limits { for (&dir_id, l) in &zero_limits {
let dir = db.lock().sample_file_dirs_by_id().get(dir_id).unwrap().get()?; dir::lower_retention(db.clone(), dir_id, &l)?;
dir::lower_retention(dir.clone(), db.clone(), &l)?;
} }
Ok(()) Ok(())
} }

View File

@ -144,12 +144,11 @@ fn actually_delete(model: &RefCell<Model>, siv: &mut Cursive) {
.collect(); .collect();
siv.pop_layer(); // deletion confirmation siv.pop_layer(); // deletion confirmation
siv.pop_layer(); // retention dialog siv.pop_layer(); // retention dialog
let dir = { {
let mut l = model.db.lock(); let mut l = model.db.lock();
l.open_sample_file_dirs(&[model.dir_id]).unwrap(); // TODO: don't unwrap. l.open_sample_file_dirs(&[model.dir_id]).unwrap(); // TODO: don't unwrap.
l.sample_file_dirs_by_id().get(&model.dir_id).unwrap().get().unwrap() }
}; if let Err(e) = dir::lower_retention(model.db.clone(), model.dir_id, &new_limits[..]) {
if let Err(e) = dir::lower_retention(dir, model.db.clone(), &new_limits[..]) {
siv.add_layer(views::Dialog::text(format!("Unable to delete excess video: {}", e)) siv.add_layer(views::Dialog::text(format!("Unable to delete excess video: {}", e))
.title("Error") .title("Error")
.dismiss_button("Abort")); .dismiss_button("Abort"));

View File

@ -75,7 +75,7 @@ enum OpenMode {
/// Locks and opens the database. /// Locks and opens the database.
/// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is. /// The returned `dir::Fd` holds the lock and should be kept open as long as the `Connection` is.
fn open_conn(db_dir: &str, mode: OpenMode) -> Result<(dir::Fd, rusqlite::Connection), Error> { fn open_conn(db_dir: &str, mode: OpenMode) -> Result<(dir::Fd, rusqlite::Connection), Error> {
let dir = dir::Fd::open(db_dir, mode == OpenMode::Create)?; let dir = dir::Fd::open(None, db_dir, mode == OpenMode::Create)?;
let ro = mode == OpenMode::ReadOnly; let ro = mode == OpenMode::ReadOnly;
dir.lock(if ro { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB) dir.lock(if ro { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB)
.map_err(|e| Error{description: format!("db dir {:?} already in use; can't get {} lock", .map_err(|e| Error{description: format!("db dir {:?} already in use; can't get {} lock",

View File

@ -143,7 +143,7 @@ pub fn run() -> Result<(), Error> {
drop(l); drop(l);
let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default()); let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default());
for (id, dir) in dirs.drain() { for (id, dir) in dirs.drain() {
let (channel, join) = dir::start_syncer(dir.clone(), db.clone())?; let (channel, join) = dir::start_syncer(db.clone(), id)?;
syncers.insert(id, Syncer { syncers.insert(id, Syncer {
dir, dir,
channel, channel,

View File

@ -38,6 +38,7 @@ use rusqlite;
mod v0_to_v1; mod v0_to_v1;
mod v1_to_v2; mod v1_to_v2;
mod v2_to_v3;
const USAGE: &'static str = r#" const USAGE: &'static str = r#"
Upgrade to the latest database schema. Upgrade to the latest database schema.
@ -92,6 +93,7 @@ pub fn run() -> Result<(), Error> {
let upgraders = [ let upgraders = [
v0_to_v1::new, v0_to_v1::new,
v1_to_v2::new, v1_to_v2::new,
v2_to_v3::new,
]; ];
{ {

View File

@ -1,5 +1,5 @@
// This file is part of Moonfire NVR, a security camera digital video recorder. // This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org> // Copyright (C) 2018 Scott Lamb <slamb@slamb.org>
// //
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by // it under the terms of the GNU General Public License as published by
@ -135,6 +135,7 @@ impl<'a> super::Upgrader for U<'a> {
self.dir_meta = Some(meta); self.dir_meta = Some(meta);
tx.execute_batch(r#" tx.execute_batch(r#"
drop table reserved_sample_files;
alter table camera rename to old_camera; alter table camera rename to old_camera;
alter table recording rename to old_recording; alter table recording rename to old_recording;
alter table video_sample_entry rename to old_video_sample_entry; alter table video_sample_entry rename to old_video_sample_entry;
@ -199,6 +200,12 @@ impl<'a> super::Upgrader for U<'a> {
data blob not null check (length(data) > 86) data blob not null check (length(data) > 86)
); );
create table garbage (
sample_file_dir_id integer references sample_file_dir (id),
composite_id integer,
primary key (sample_file_dir_id, composite_id)
) without rowid;
insert into camera insert into camera
select select
id, id,

View File

@ -0,0 +1,166 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2018 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
/// Upgrades a version 2 schema to a version 3 schema.
use db::{self, FromSqlUuid};
use dir;
use error::Error;
use libc;
use std::io::{self, Write};
use std::mem;
//use std::rc::Rc;
use rusqlite;
use uuid::Uuid;
pub struct U;
pub fn new<'a>(_args: &'a super::Args) -> Result<Box<super::Upgrader + 'a>, Error> {
Ok(Box::new(U))
}
/*fn build_stream_to_dir(dir: &dir::Fd, tx: &rusqlite::Transaction)
-> Result<Vec<Option<Rc<dir::Fd>>>, Error> {
let mut v = Vec::new();
let max_id: u32 = tx.query_row("select max(id) from stream", &[], |r| r.get_checked(0))??;
v.resize((max_id + 1) as usize, None);
let mut stmt = tx.prepare(r#"
select
stream.id,
camera.uuid,
stream.type
from
camera join stream on (camera.id = stream.camera_id)
"#)?;
let mut rows = stmt.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let id: i32 = row.get_checked(0)?;
let uuid: FromSqlUuid = row.get_checked(1)?;
let type_: String = row.get_checked(2)?;
v[id as usize] =
Some(Rc::new(dir::Fd::open(Some(dir), &format!("{}-{}", uuid.0, type_), true)?));
}
Ok(v)
}*/
/// Gets a pathname for a sample file suitable for passing to open or unlink.
fn get_uuid_pathname(uuid: Uuid) -> [libc::c_char; 37] {
let mut buf = [0u8; 37];
write!(&mut buf[..36], "{}", uuid.hyphenated()).expect("can't format uuid to pathname buf");
// libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64).
unsafe { mem::transmute::<[u8; 37], [libc::c_char; 37]>(buf) }
}
fn get_id_pathname(id: db::CompositeId) -> [libc::c_char; 17] {
let mut buf = [0u8; 17];
write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf");
unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) }
}
impl super::Upgrader for U {
fn in_tx(&mut self, tx: &rusqlite::Transaction) -> Result<(), Error> {
/*let (meta, path) = tx.query_row(r#"
select
meta.uuid,
dir.path,
dir.uuid,
dir.last_complete_open_id,
open.uuid
from
meta cross join sample_file_dir dir
join open on (dir.last_complete_open_id = open.id)
"#, |row| -> Result<_, Error> {
let mut meta = DirMeta::new();
let db_uuid: FromSqlUuid = row.get_checked(0)?;
let path: String = row.get_checked(1)?;
let dir_uuid: FromSqlUuid = row.get_checked(2)?;
let open_uuid: FromSqlUuid = row.get_checked(4)?;
meta.db_uuid.extend_from_slice(&db_uuid.0.as_bytes()[..]);
meta.dir_uuid.extend_from_slice(&dir_uuid.0.as_bytes()[..]);
let open = meta.mut_last_complete_open();
open.id = row.get_checked(3)?;
open.uuid.extend_from_slice(&open_uuid.0.as_bytes()[..]);
Ok((meta, path))
})??;*/
let path: String = tx.query_row(r#"
select path from sample_file_dir
"#, &[], |row| { row.get_checked(0) })??;
// Build map of stream -> dirname.
let d = dir::Fd::open(None, &path, false)?;
//let stream_to_dir = build_stream_to_dir(&d, tx)?;
let mut stmt = tx.prepare(r#"
select
composite_id,
sample_file_uuid
from
recording_playback
"#)?;
let mut rows = stmt.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let id = db::CompositeId(row.get_checked(0)?);
let sample_file_uuid: FromSqlUuid = row.get_checked(1)?;
let from_path = get_uuid_pathname(sample_file_uuid.0);
let to_path = get_id_pathname(id);
//let to_dir: &dir::Fd = stream_to_dir[stream_id as usize].as_ref().unwrap();
let r = unsafe { dir::renameat(&d, from_path.as_ptr(), &d, to_path.as_ptr()) };
if let Err(e) = r {
if e.kind() == io::ErrorKind::NotFound {
continue; // assume it was already moved.
}
Err(e)?;
}
}
// These create statements match the schema.sql when version 3 was the latest.
tx.execute_batch(r#"
alter table recording_playback rename to old_recording_playback;
create table recording_playback (
composite_id integer primary key references recording (composite_id),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
);
insert into recording_playback
select
composite_id,
sample_file_sha1,
video_index
from
old_recording_playback;
drop table old_recording_playback;
"#)?;
Ok(())
}
}

372
src/db.rs
View File

@ -75,11 +75,10 @@ use time;
use uuid::Uuid; use uuid::Uuid;
/// Expected schema version. See `guide/schema.md` for more information. /// Expected schema version. See `guide/schema.md` for more information.
pub const EXPECTED_VERSION: i32 = 2; pub const EXPECTED_VERSION: i32 = 3;
const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" const GET_RECORDING_PLAYBACK_SQL: &'static str = r#"
select select
sample_file_uuid,
video_index video_index
from from
recording_playback recording_playback
@ -87,24 +86,12 @@ const GET_RECORDING_PLAYBACK_SQL: &'static str = r#"
composite_id = :composite_id composite_id = :composite_id
"#; "#;
const DELETE_RESERVATION_SQL: &'static str = const DELETE_GARBAGE_SQL: &'static str =
"delete from reserved_sample_files where uuid = :uuid"; "delete from garbage where composite_id = :composite_id";
const INSERT_RESERVATION_SQL: &'static str = r#" const INSERT_GARBAGE_SQL: &'static str =
insert into reserved_sample_files (uuid, state) "insert into garbage (sample_file_dir_id, composite_id)
values (:uuid, :state) values (:sample_file_dir_id, :composite_id)";
"#;
/// Valid values for the `state` column in the `reserved_sample_files` table.
enum ReservationState {
/// This uuid has not yet been added to the `recording` table. The file may be unwritten,
/// partially written, or fully written.
Writing = 0,
/// This uuid was previously in the `recording` table. The file may be fully written or
/// unlinked.
Deleting = 1,
}
const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
insert into video_sample_entry (sha1, width, height, rfc6381_codec, data) insert into video_sample_entry (sha1, width, height, rfc6381_codec, data)
@ -121,9 +108,8 @@ const INSERT_RECORDING_SQL: &'static str = r#"
"#; "#;
const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#"
insert into recording_playback (composite_id, sample_file_uuid, sample_file_sha1, video_index) insert into recording_playback (composite_id, sample_file_sha1, video_index)
values (:composite_id, :sample_file_uuid, :sample_file_sha1, values (:composite_id, :sample_file_sha1, :video_index)
:video_index)
"#; "#;
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
@ -131,19 +117,17 @@ const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#"
select select
recording.composite_id, composite_id,
recording_playback.sample_file_uuid, start_time_90k,
recording.start_time_90k, duration_90k,
recording.duration_90k, sample_file_bytes
recording.sample_file_bytes
from from
recording recording
join recording_playback on (recording.composite_id = recording_playback.composite_id)
where where
:start <= recording.composite_id and :start <= composite_id and
recording.composite_id < :end composite_id < :end
order by order by
recording.composite_id composite_id
"#; "#;
const DELETE_RECORDING_SQL: &'static str = r#" const DELETE_RECORDING_SQL: &'static str = r#"
@ -205,18 +189,16 @@ impl rusqlite::types::FromSql for FromSqlUuid {
} }
} }
/// A box with space for the uuid (initially uninitialized) and the video index. struct VideoIndex(Box<[u8]>);
/// The caller must fill the uuid bytes.
struct PlaybackData(Box<[u8]>);
impl rusqlite::types::FromSql for PlaybackData { impl rusqlite::types::FromSql for VideoIndex {
fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> { fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult<Self> {
let blob = value.as_blob()?; let blob = value.as_blob()?;
let len = 16 + blob.len(); let len = blob.len();
let mut v = Vec::with_capacity(len); let mut v = Vec::with_capacity(len);
unsafe { v.set_len(len) }; unsafe { v.set_len(len) };
v[16..].copy_from_slice(blob); v.copy_from_slice(blob);
Ok(PlaybackData(v.into_boxed_slice())) Ok(VideoIndex(v.into_boxed_slice()))
} }
} }
@ -238,8 +220,7 @@ pub struct ListRecordingsRow {
pub start: recording::Time, pub start: recording::Time,
pub video_sample_entry: Arc<VideoSampleEntry>, pub video_sample_entry: Arc<VideoSampleEntry>,
pub stream_id: i32, pub id: CompositeId,
pub id: i32,
/// This is a recording::Duration, but a single recording's duration fits into an i32. /// This is a recording::Duration, but a single recording's duration fits into an i32.
pub duration_90k: i32, pub duration_90k: i32,
@ -267,19 +248,9 @@ pub struct ListAggregatedRecordingsRow {
/// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`. /// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`.
#[derive(Debug)] #[derive(Debug)]
pub struct RecordingPlayback<'a> { pub struct RecordingPlayback<'a> {
pub sample_file_uuid: Uuid,
pub video_index: &'a [u8], pub video_index: &'a [u8],
} }
impl<'a> RecordingPlayback<'a> {
fn new(data: &'a [u8]) -> Self {
RecordingPlayback {
sample_file_uuid: Uuid::from_bytes(&data[..16]).unwrap(),
video_index: &data[16..],
}
}
}
/// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`. /// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`.
pub enum RecordingFlags { pub enum RecordingFlags {
TrailingZero = 1, TrailingZero = 1,
@ -288,7 +259,7 @@ pub enum RecordingFlags {
/// A recording to pass to `insert_recording`. /// A recording to pass to `insert_recording`.
#[derive(Debug)] #[derive(Debug)]
pub struct RecordingToInsert { pub struct RecordingToInsert {
pub stream_id: i32, pub id: CompositeId,
pub run_offset: i32, pub run_offset: i32,
pub flags: i32, pub flags: i32,
pub sample_file_bytes: i32, pub sample_file_bytes: i32,
@ -297,7 +268,6 @@ pub struct RecordingToInsert {
pub video_samples: i32, pub video_samples: i32,
pub video_sync_samples: i32, pub video_sync_samples: i32,
pub video_sample_entry_id: i32, pub video_sample_entry_id: i32,
pub sample_file_uuid: Uuid,
pub video_index: Vec<u8>, pub video_index: Vec<u8>,
pub sample_file_sha1: [u8; 20], pub sample_file_sha1: [u8; 20],
} }
@ -305,9 +275,7 @@ pub struct RecordingToInsert {
/// A row used in `list_oldest_sample_files`. /// A row used in `list_oldest_sample_files`.
#[derive(Debug)] #[derive(Debug)]
pub struct ListOldestSampleFilesRow { pub struct ListOldestSampleFilesRow {
pub uuid: Uuid, pub id: CompositeId,
pub stream_id: i32,
pub recording_id: i32,
pub time: Range<recording::Time>, pub time: Range<recording::Time>,
pub sample_file_bytes: i32, pub sample_file_bytes: i32,
} }
@ -446,7 +414,7 @@ pub struct Stream {
/// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day.
pub days: BTreeMap<StreamDayKey, StreamDayValue>, pub days: BTreeMap<StreamDayKey, StreamDayValue>,
pub record: bool, pub record: bool,
next_recording_id: i32, pub next_recording_id: i32,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -611,7 +579,7 @@ struct State {
cameras_by_uuid: BTreeMap<Uuid, i32>, cameras_by_uuid: BTreeMap<Uuid, i32>,
video_sample_entries: BTreeMap<i32, Arc<VideoSampleEntry>>, video_sample_entries: BTreeMap<i32, Arc<VideoSampleEntry>>,
list_recordings_by_time_sql: String, list_recordings_by_time_sql: String,
playback_cache: RefCell<LruCache<i64, Box<[u8]>, fnv::FnvBuildHasher>>, video_index_cache: RefCell<LruCache<i64, Box<[u8]>, fnv::FnvBuildHasher>>,
} }
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
@ -632,11 +600,6 @@ pub struct Transaction<'a> {
/// well. We could use savepoints (nested transactions) for this, but for simplicity we just /// well. We could use savepoints (nested transactions) for this, but for simplicity we just
/// require the entire transaction be rolled back. /// require the entire transaction be rolled back.
must_rollback: bool, must_rollback: bool,
/// Normally sample file uuids must be reserved prior to a recording being inserted.
/// It's convenient in benchmarks though to allow the same segment to be inserted into the
/// database many times, so this safety check can be disabled.
pub bypass_reservation_for_testing: bool,
} }
/// A modification to be done to a `Stream` after a `Transaction` is committed. /// A modification to be done to a `Stream` after a `Transaction` is committed.
@ -666,53 +629,56 @@ struct StreamModification {
new_record: Option<bool>, new_record: Option<bool>,
} }
fn composite_id(stream_id: i32, recording_id: i32) -> i64 { #[derive(Copy, Clone, Debug, Eq, PartialEq)]
(stream_id as i64) << 32 | recording_id as i64 pub struct CompositeId(pub i64);
impl CompositeId {
pub fn new(stream_id: i32, recording_id: i32) -> Self {
CompositeId((stream_id as i64) << 32 | recording_id as i64)
}
pub fn stream(self) -> i32 { (self.0 >> 32) as i32 }
pub fn recording(self) -> i32 { self.0 as i32 }
}
impl ::std::fmt::Display for CompositeId {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
write!(f, "{}/{}", self.stream(), self.recording())
}
} }
impl<'a> Transaction<'a> { impl<'a> Transaction<'a> {
/// Reserves a new, randomly generated UUID to be used as a sample file.
pub fn reserve_sample_file(&mut self) -> Result<Uuid, Error> {
let mut stmt = self.tx.prepare_cached(INSERT_RESERVATION_SQL)?;
let uuid = Uuid::new_v4();
let uuid_bytes = &uuid.as_bytes()[..];
stmt.execute_named(&[
(":uuid", &uuid_bytes),
(":state", &(ReservationState::Writing as i64))
])?;
info!("reserved {}", uuid);
Ok(uuid)
}
/// Deletes the given recordings from the `recording` and `recording_playback` tables. /// Deletes the given recordings from the `recording` and `recording_playback` tables.
/// Note they are not fully removed from the database; the uuids are transferred to the /// Note they are not fully removed from the database; the uuids are transferred to the
/// `reserved_sample_files` table. The caller should `unlink` the files, then remove the /// `garbage` table. The caller should `unlink` the files, then remove the `garbage` row.
/// reservation.
pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> { pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> {
let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?; let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?;
let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?; let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?;
let mut insert = self.tx.prepare_cached(INSERT_RESERVATION_SQL)?; let mut insert = self.tx.prepare_cached(INSERT_GARBAGE_SQL)?;
self.check_must_rollback()?; self.check_must_rollback()?;
self.must_rollback = true; self.must_rollback = true;
for row in rows { for row in rows {
let composite_id = &composite_id(row.stream_id, row.recording_id); let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?;
let changes = del1.execute_named(&[(":composite_id", composite_id)])?;
if changes != 1 { if changes != 1 {
return Err(Error::new(format!("no such recording {}/{} (uuid {})", return Err(Error::new(format!("no such recording {}", row.id)));
row.stream_id, row.recording_id, row.uuid)));
} }
let changes = del2.execute_named(&[(":composite_id", composite_id)])?; let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?;
if changes != 1 { if changes != 1 {
return Err(Error::new(format!("no such recording_playback {}/{} (uuid {})", return Err(Error::new(format!("no such recording_playback {}", row.id)));
row.stream_id, row.recording_id, row.uuid)));
} }
let uuid = &row.uuid.as_bytes()[..]; let sid = row.id.stream();
let did = self.state
.streams_by_id
.get(&sid)
.ok_or_else(|| Error::new(format!("no such stream {}", sid)))?
.sample_file_dir_id
.ok_or_else(|| Error::new(format!("stream {} has no dir", sid)))?;
insert.execute_named(&[ insert.execute_named(&[
(":uuid", &uuid), (":sample_file_dir_id", &did),
(":state", &(ReservationState::Deleting as i64)) (":composite_id", &row.id.0)],
])?; )?;
let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.stream_id); let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.id.stream());
m.duration -= row.time.end - row.time.start; m.duration -= row.time.end - row.time.start;
m.sample_file_bytes -= row.sample_file_bytes as i64; m.sample_file_bytes -= row.sample_file_bytes as i64;
adjust_days(row.time.clone(), -1, &mut m.days); adjust_days(row.time.clone(), -1, &mut m.days);
@ -721,59 +687,46 @@ impl<'a> Transaction<'a> {
Ok(()) Ok(())
} }
/// Marks the given sample file uuid as deleted. Accepts uuids in either `ReservationState`. /// Marks the given sample files as deleted. This shouldn't be called until the files have
/// This shouldn't be called until the files have been `unlink()`ed and the parent directory /// been `unlink()`ed and the parent directory `fsync()`ed.
/// `fsync()`ed. pub fn mark_sample_files_deleted(&mut self, ids: &[CompositeId]) -> Result<(), Error> {
pub fn mark_sample_files_deleted(&mut self, uuids: &[Uuid]) -> Result<(), Error> { if ids.is_empty() { return Ok(()); }
if uuids.is_empty() { return Ok(()); } let mut stmt = self.tx.prepare_cached(DELETE_GARBAGE_SQL)?;
let mut stmt = for &id in ids {
self.tx.prepare_cached("delete from reserved_sample_files where uuid = :uuid;")?; let changes = stmt.execute_named(&[(":composite_id", &id.0)])?;
for uuid in uuids {
let uuid_bytes = &uuid.as_bytes()[..];
let changes = stmt.execute_named(&[(":uuid", &uuid_bytes)])?;
if changes != 1 { if changes != 1 {
return Err(Error::new(format!("no reservation for {}", uuid.hyphenated()))); return Err(Error::new(format!("no garbage row for {}", id)));
} }
} }
Ok(()) Ok(())
} }
/// Inserts the specified recording. /// Inserts the specified recording.
/// The sample file uuid must have been previously reserved. (Although this can be bypassed pub fn insert_recording(&mut self, r: &RecordingToInsert) -> Result<(), Error> {
/// for testing; see the `bypass_reservation_for_testing` field.)
pub fn insert_recording(&mut self, r: &RecordingToInsert) -> Result<i32, Error> {
self.check_must_rollback()?; self.check_must_rollback()?;
// Sanity checking.
if r.time.end < r.time.start { if r.time.end < r.time.start {
return Err(Error::new(format!("end time {} must be >= start time {}", return Err(Error::new(format!("end time {} must be >= start time {}",
r.time.end, r.time.start))); r.time.end, r.time.start)));
} }
// Unreserve the sample file uuid and insert the recording row. // Check that the recording id is acceptable and do the insertion.
// TODO: var used? let stream = match self.state.streams_by_id.get(&r.id.stream()) {
let stream = match self.state.streams_by_id.get_mut(&r.stream_id) { None => return Err(Error::new(format!("no such stream id {}", r.id.stream()))),
None => return Err(Error::new(format!("no such stream id {}", r.stream_id))),
Some(s) => s, Some(s) => s,
}; };
let uuid = &r.sample_file_uuid.as_bytes()[..];
{
let mut stmt = self.tx.prepare_cached(DELETE_RESERVATION_SQL)?;
let changes = stmt.execute_named(&[(":uuid", &uuid)])?;
if changes != 1 && !self.bypass_reservation_for_testing {
return Err(Error::new(format!("uuid {} is not reserved", r.sample_file_uuid)));
}
}
self.must_rollback = true; self.must_rollback = true;
let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.stream_id); let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.id.stream());
let recording_id;
{ {
recording_id = m.new_next_recording_id.unwrap_or(stream.next_recording_id); let next = m.new_next_recording_id.unwrap_or(stream.next_recording_id);
let composite_id = composite_id(r.stream_id, recording_id); if r.id.recording() < next {
return Err(Error::new(format!("recording {} out of order; next id is {}!",
r.id, next)));
}
let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?; let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?;
stmt.execute_named(&[ stmt.execute_named(&[
(":composite_id", &composite_id), (":composite_id", &r.id.0),
(":stream_id", &(r.stream_id as i64)), (":stream_id", &(r.id.stream() as i64)),
(":run_offset", &r.run_offset), (":run_offset", &r.run_offset),
(":flags", &r.flags), (":flags", &r.flags),
(":sample_file_bytes", &r.sample_file_bytes), (":sample_file_bytes", &r.sample_file_bytes),
@ -784,18 +737,17 @@ impl<'a> Transaction<'a> {
(":video_sync_samples", &r.video_sync_samples), (":video_sync_samples", &r.video_sync_samples),
(":video_sample_entry_id", &r.video_sample_entry_id), (":video_sample_entry_id", &r.video_sample_entry_id),
])?; ])?;
m.new_next_recording_id = Some(recording_id + 1); m.new_next_recording_id = Some(r.id.recording() + 1);
let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?;
let sha1 = &r.sample_file_sha1[..]; let sha1 = &r.sample_file_sha1[..];
stmt.execute_named(&[ stmt.execute_named(&[
(":composite_id", &composite_id), (":composite_id", &r.id.0),
(":sample_file_uuid", &uuid),
(":sample_file_sha1", &sha1), (":sample_file_sha1", &sha1),
(":video_index", &r.video_index), (":video_index", &r.video_index),
])?; ])?;
let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?;
stmt.execute_named(&[ stmt.execute_named(&[
(":stream_id", &(r.stream_id as i64)), (":stream_id", &(r.id.stream() as i64)),
(":next_recording_id", &m.new_next_recording_id), (":next_recording_id", &m.new_next_recording_id),
])?; ])?;
} }
@ -803,7 +755,7 @@ impl<'a> Transaction<'a> {
m.duration += r.time.end - r.time.start; m.duration += r.time.end - r.time.start;
m.sample_file_bytes += r.sample_file_bytes as i64; m.sample_file_bytes += r.sample_file_bytes as i64;
adjust_days(r.time.clone(), 1, &mut m.days); adjust_days(r.time.clone(), 1, &mut m.days);
Ok(recording_id) Ok(())
} }
/// Updates the `record` and `retain_bytes` for the given stream. /// Updates the `record` and `retain_bytes` for the given stream.
@ -1090,6 +1042,11 @@ impl LockedDatabase {
let mut meta = schema::DirMeta::default(); let mut meta = schema::DirMeta::default();
meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]);
meta.dir_uuid.extend_from_slice(&dir.uuid.as_bytes()[..]); meta.dir_uuid.extend_from_slice(&dir.uuid.as_bytes()[..]);
if let Some(o) = dir.last_complete_open {
let open = meta.mut_last_complete_open();
open.id = o.id;
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
}
if let Some(o) = o { if let Some(o) = o {
let open = meta.mut_in_progress_open(); let open = meta.mut_in_progress_open();
open.id = o.id; open.id = o.id;
@ -1149,7 +1106,6 @@ impl LockedDatabase {
mods_by_stream: FnvHashMap::default(), mods_by_stream: FnvHashMap::default(),
tx: self.conn.transaction()?, tx: self.conn.transaction()?,
must_rollback: false, must_rollback: false,
bypass_reservation_for_testing: false,
}) })
} }
@ -1171,7 +1127,7 @@ impl LockedDatabase {
(":stream_id", &stream_id), (":stream_id", &stream_id),
(":start_time_90k", &desired_time.start.0), (":start_time_90k", &desired_time.start.0),
(":end_time_90k", &desired_time.end.0)])?; (":end_time_90k", &desired_time.end.0)])?;
self.list_recordings_inner(stream_id, rows, f) self.list_recordings_inner(rows, f)
} }
/// Lists the specified recordigs in ascending order by id. /// Lists the specified recordigs in ascending order by id.
@ -1180,29 +1136,26 @@ impl LockedDatabase {
where F: FnMut(ListRecordingsRow) -> Result<(), Error> { where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?;
let rows = stmt.query_named(&[ let rows = stmt.query_named(&[
(":start", &composite_id(stream_id, desired_ids.start)), (":start", &CompositeId::new(stream_id, desired_ids.start).0),
(":end", &composite_id(stream_id, desired_ids.end)), (":end", &CompositeId::new(stream_id, desired_ids.end).0),
])?; ])?;
self.list_recordings_inner(stream_id, rows, f) self.list_recordings_inner(rows, f)
} }
fn list_recordings_inner<F>(&self, stream_id: i32, mut rows: rusqlite::Rows, mut f: F) fn list_recordings_inner<F>(&self, mut rows: rusqlite::Rows, mut f: F) -> Result<(), Error>
-> Result<(), Error>
where F: FnMut(ListRecordingsRow) -> Result<(), Error> { where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
while let Some(row) = rows.next() { while let Some(row) = rows.next() {
let row = row?; let row = row?;
let id = row.get_checked::<_, i64>(0)? as i32; // drop top bits of composite_id. let id = CompositeId(row.get_checked::<_, i64>(0)?);
let vse_id = row.get_checked(8)?; let vse_id = row.get_checked(8)?;
let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) { let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) {
Some(v) => v, Some(v) => v,
None => { None => {
return Err(Error::new(format!( return Err(Error::new(format!(
"recording {}/{} references nonexistent video_sample_entry {}", "recording {} references nonexistent video_sample_entry {}", id, vse_id)));
stream_id, id, vse_id)));
}, },
}; };
let out = ListRecordingsRow { let out = ListRecordingsRow {
stream_id,
id, id,
run_offset: row.get_checked(1)?, run_offset: row.get_checked(1)?,
flags: row.get_checked(2)?, flags: row.get_checked(2)?,
@ -1241,11 +1194,12 @@ impl LockedDatabase {
// causing problems.) // causing problems.)
let mut aggs: BTreeMap<i32, ListAggregatedRecordingsRow> = BTreeMap::new(); let mut aggs: BTreeMap<i32, ListAggregatedRecordingsRow> = BTreeMap::new();
self.list_recordings_by_time(stream_id, desired_time, |row| { self.list_recordings_by_time(stream_id, desired_time, |row| {
let run_start_id = row.id - row.run_offset; let recording_id = row.id.recording();
let run_start_id = recording_id - row.run_offset;
let needs_flush = if let Some(a) = aggs.get(&run_start_id) { let needs_flush = if let Some(a) = aggs.get(&run_start_id) {
let new_dur = a.time.end - a.time.start + let new_dur = a.time.end - a.time.start +
recording::Duration(row.duration_90k as i64); recording::Duration(row.duration_90k as i64);
a.ids.end != row.id || row.video_sample_entry.id != a.video_sample_entry.id || a.ids.end != recording_id || row.video_sample_entry.id != a.video_sample_entry.id ||
new_dur >= forced_split new_dur >= forced_split
} else { } else {
false false
@ -1261,7 +1215,7 @@ impl LockedDatabase {
stream_id, a.ids.end - 1, a.time.end, row.id, row.start))); stream_id, a.ids.end - 1, a.time.end, row.id, row.start)));
} }
a.time.end.0 += row.duration_90k as i64; a.time.end.0 += row.duration_90k as i64;
a.ids.end = row.id + 1; a.ids.end = recording_id + 1;
a.video_samples += row.video_samples as i64; a.video_samples += row.video_samples as i64;
a.video_sync_samples += row.video_sync_samples as i64; a.video_sync_samples += row.video_sync_samples as i64;
a.sample_file_bytes += row.sample_file_bytes as i64; a.sample_file_bytes += row.sample_file_bytes as i64;
@ -1272,13 +1226,13 @@ impl LockedDatabase {
if need_insert { if need_insert {
aggs.insert(run_start_id, ListAggregatedRecordingsRow{ aggs.insert(run_start_id, ListAggregatedRecordingsRow{
time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64), time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
ids: row.id .. row.id+1, ids: recording_id .. recording_id+1,
video_samples: row.video_samples as i64, video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64, video_sync_samples: row.video_sync_samples as i64,
sample_file_bytes: row.sample_file_bytes as i64, sample_file_bytes: row.sample_file_bytes as i64,
video_sample_entry: row.video_sample_entry, video_sample_entry: row.video_sample_entry,
stream_id, stream_id,
run_start_id: row.id - row.run_offset, run_start_id,
flags: row.flags, flags: row.flags,
}); });
}; };
@ -1293,44 +1247,37 @@ impl LockedDatabase {
/// Calls `f` with a single `recording_playback` row. /// Calls `f` with a single `recording_playback` row.
/// Note the lock is held for the duration of `f`. /// Note the lock is held for the duration of `f`.
/// This uses a LRU cache to reduce the number of retrievals from the database. /// This uses a LRU cache to reduce the number of retrievals from the database.
pub fn with_recording_playback<F, R>(&self, stream_id: i32, recording_id: i32, f: F) pub fn with_recording_playback<F, R>(&self, id: CompositeId, f: F) -> Result<R, Error>
-> Result<R, Error>
where F: FnOnce(&RecordingPlayback) -> Result<R, Error> { where F: FnOnce(&RecordingPlayback) -> Result<R, Error> {
let composite_id = composite_id(stream_id, recording_id); let mut cache = self.state.video_index_cache.borrow_mut();
let mut cache = self.state.playback_cache.borrow_mut(); if let Some(video_index) = cache.get_mut(&id.0) {
if let Some(r) = cache.get_mut(&composite_id) { trace!("cache hit for recording {}", id);
trace!("cache hit for recording {}/{}", stream_id, recording_id); return f(&RecordingPlayback { video_index });
return f(&RecordingPlayback::new(r));
} }
trace!("cache miss for recording {}/{}", stream_id, recording_id); trace!("cache miss for recording {}", id);
let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?; let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?;
let mut rows = stmt.query_named(&[(":composite_id", &composite_id)])?; let mut rows = stmt.query_named(&[(":composite_id", &id.0)])?;
if let Some(row) = rows.next() { if let Some(row) = rows.next() {
let row = row?; let row = row?;
let uuid: FromSqlUuid = row.get_checked(0)?; let video_index: VideoIndex = row.get_checked(0)?;
let data = { let result = f(&RecordingPlayback { video_index: &video_index.0[..] });
let mut data: PlaybackData = row.get_checked(1)?; cache.insert(id.0, video_index.0);
data.0[0..16].copy_from_slice(uuid.0.as_bytes());
data.0
};
let result = f(&RecordingPlayback::new(&data));
cache.insert(composite_id, data);
return result; return result;
} }
Err(Error::new(format!("no such recording {}/{}", stream_id, recording_id))) Err(Error::new(format!("no such recording {}", id)))
} }
/// Lists all reserved sample files. /// Lists all garbage ids.
pub fn list_reserved_sample_files(&self) -> Result<Vec<Uuid>, Error> { pub fn list_garbage(&self, dir_id: i32) -> Result<Vec<CompositeId>, Error> {
let mut reserved = Vec::new(); let mut garbage = Vec::new();
let mut stmt = self.conn.prepare_cached("select uuid from reserved_sample_files;")?; let mut stmt = self.conn.prepare_cached(
let mut rows = stmt.query_named(&[])?; "select composite_id from garbage where sample_file_dir_id = ?")?;
let mut rows = stmt.query(&[&dir_id])?;
while let Some(row) = rows.next() { while let Some(row) = rows.next() {
let row = row?; let row = row?;
let uuid: FromSqlUuid = row.get_checked(0)?; garbage.push(CompositeId(row.get_checked(0)?));
reserved.push(uuid.0);
} }
Ok(reserved) Ok(garbage)
} }
/// Lists the oldest sample files (to delete to free room). /// Lists the oldest sample files (to delete to free room).
@ -1339,21 +1286,18 @@ impl LockedDatabase {
where F: FnMut(ListOldestSampleFilesRow) -> bool { where F: FnMut(ListOldestSampleFilesRow) -> bool {
let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?;
let mut rows = stmt.query_named(&[ let mut rows = stmt.query_named(&[
(":start", &composite_id(stream_id, 0)), (":start", &CompositeId::new(stream_id, 0).0),
(":end", &composite_id(stream_id + 1, 0)), (":end", &CompositeId::new(stream_id + 1, 0).0),
])?; ])?;
while let Some(row) = rows.next() { while let Some(row) = rows.next() {
let row = row?; let row = row?;
let start = recording::Time(row.get_checked(2)?); let id = CompositeId(row.get_checked(0)?);
let duration = recording::Duration(row.get_checked(3)?); let start = recording::Time(row.get_checked(1)?);
let composite_id: i64 = row.get_checked(0)?; let duration = recording::Duration(row.get_checked(2)?);
let uuid: FromSqlUuid = row.get_checked(1)?;
let should_continue = f(ListOldestSampleFilesRow{ let should_continue = f(ListOldestSampleFilesRow{
recording_id: composite_id as i32, id,
stream_id: (composite_id >> 32) as i32,
uuid: uuid.0,
time: start .. start + duration, time: start .. start + duration,
sample_file_bytes: row.get_checked(4)?, sample_file_bytes: row.get_checked(3)?,
}); });
if !should_continue { if !should_continue {
break; break;
@ -1844,7 +1788,7 @@ impl Database {
cameras_by_uuid: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(),
streams_by_id: BTreeMap::new(), streams_by_id: BTreeMap::new(),
video_sample_entries: BTreeMap::new(), video_sample_entries: BTreeMap::new(),
playback_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
list_recordings_by_time_sql: list_recordings_by_time_sql, list_recordings_by_time_sql: list_recordings_by_time_sql,
}, },
})); }));
@ -1895,12 +1839,10 @@ impl Database {
mod tests { mod tests {
extern crate tempdir; extern crate tempdir;
use core::cmp::Ord;
use recording::{self, TIME_UNITS_PER_SEC}; use recording::{self, TIME_UNITS_PER_SEC};
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::error::Error as E; use std::error::Error as E;
use std::fmt::Debug;
use testutil; use testutil;
use super::*; use super::*;
use super::adjust_days; // non-public. use super::adjust_days; // non-public.
@ -1960,13 +1902,13 @@ mod tests {
// TODO(slamb): test that the days logic works correctly. // TODO(slamb): test that the days logic works correctly.
let mut rows = 0; let mut rows = 0;
let mut recording_id = -1; let mut recording_id = None;
{ {
let db = db.lock(); let db = db.lock();
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
db.list_recordings_by_time(stream_id, all_time, |row| { db.list_recordings_by_time(stream_id, all_time, |row| {
rows += 1; rows += 1;
recording_id = row.id; recording_id = Some(row.id);
assert_eq!(r.time, assert_eq!(r.time,
row.start .. row.start + recording::Duration(row.duration_90k as i64)); row.start .. row.start + recording::Duration(row.duration_90k as i64));
assert_eq!(r.video_samples, row.video_samples); assert_eq!(r.video_samples, row.video_samples);
@ -1981,8 +1923,7 @@ mod tests {
rows = 0; rows = 0;
db.lock().list_oldest_sample_files(stream_id, |row| { db.lock().list_oldest_sample_files(stream_id, |row| {
rows += 1; rows += 1;
assert_eq!(recording_id, row.recording_id); assert_eq!(recording_id, Some(row.id));
assert_eq!(r.sample_file_uuid, row.uuid);
assert_eq!(r.time, row.time); assert_eq!(r.time, row.time);
assert_eq!(r.sample_file_bytes, row.sample_file_bytes); assert_eq!(r.sample_file_bytes, row.sample_file_bytes);
true true
@ -1993,13 +1934,6 @@ mod tests {
// TODO: with_recording_playback. // TODO: with_recording_playback.
} }
fn assert_unsorted_eq<T>(mut a: Vec<T>, mut b: Vec<T>)
where T: Debug + Ord {
a.sort();
b.sort();
assert_eq!(a, b);
}
#[test] #[test]
fn test_adjust_days() { fn test_adjust_days() {
testutil::init(); testutil::init();
@ -2076,10 +2010,10 @@ mod tests {
fn test_version_too_old() { fn test_version_too_old() {
testutil::init(); testutil::init();
let c = setup_conn(); let c = setup_conn();
c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap(); c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap();
let e = Database::new(c, false).unwrap_err(); let e = Database::new(c, false).unwrap_err();
assert!(e.description().starts_with( assert!(e.description().starts_with(
"Database schema version 1 is too old (expected 2)"), "got: {:?}", "Database schema version 2 is too old (expected 3)"), "got: {:?}",
e.description()); e.description());
} }
@ -2087,10 +2021,10 @@ mod tests {
fn test_version_too_new() { fn test_version_too_new() {
testutil::init(); testutil::init();
let c = setup_conn(); let c = setup_conn();
c.execute_batch("delete from version; insert into version values (3, 0, '');").unwrap(); c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap();
let e = Database::new(c, false).unwrap_err(); let e = Database::new(c, false).unwrap_err();
assert!(e.description().starts_with( assert!(e.description().starts_with(
"Database schema version 3 is too new (expected 2)"), "got: {:?}", e.description()); "Database schema version 4 is too new (expected 3)"), "got: {:?}", e.description());
} }
/// Basic test of running some queries on a fresh database. /// Basic test of running some queries on a fresh database.
@ -2138,30 +2072,19 @@ mod tests {
let db = Database::new(conn, true).unwrap(); let db = Database::new(conn, true).unwrap();
assert_no_recordings(&db, camera_uuid); assert_no_recordings(&db, camera_uuid);
assert_eq!(db.lock().list_reserved_sample_files().unwrap(), &[]); assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), &[]);
let (uuid_to_use, uuid_to_keep);
{
let mut db = db.lock();
let mut tx = db.tx().unwrap();
uuid_to_use = tx.reserve_sample_file().unwrap();
uuid_to_keep = tx.reserve_sample_file().unwrap();
tx.commit().unwrap();
}
assert_unsorted_eq(db.lock().list_reserved_sample_files().unwrap(),
vec![uuid_to_use, uuid_to_keep]);
let vse_id = db.lock().insert_video_sample_entry( let vse_id = db.lock().insert_video_sample_entry(
1920, 1080, include_bytes!("testdata/avc1").to_vec(), 1920, 1080, include_bytes!("testdata/avc1").to_vec(),
"avc1.4d0029".to_owned()).unwrap(); "avc1.4d0029".to_owned()).unwrap();
assert!(vse_id > 0, "vse_id = {}", vse_id); assert!(vse_id > 0, "vse_id = {}", vse_id);
// Inserting a recording should succeed and remove its uuid from the reserved table. // Inserting a recording should succeed and advance the next recording id.
let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC); let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC);
let stream_id = camera_id; // TODO let stream_id = camera_id; // TODO
let id = CompositeId::new(stream_id, 1);
let recording = RecordingToInsert { let recording = RecordingToInsert {
stream_id, id,
sample_file_bytes: 42, sample_file_bytes: 42,
run_offset: 0, run_offset: 0,
flags: 0, flags: 0,
@ -2170,7 +2093,6 @@ mod tests {
video_samples: 1, video_samples: 1,
video_sync_samples: 1, video_sync_samples: 1,
video_sample_entry_id: vse_id, video_sample_entry_id: vse_id,
sample_file_uuid: uuid_to_use,
video_index: [0u8; 100].to_vec(), video_index: [0u8; 100].to_vec(),
sample_file_sha1: [0u8; 20], sample_file_sha1: [0u8; 20],
}; };
@ -2180,8 +2102,7 @@ mod tests {
tx.insert_recording(&recording).unwrap(); tx.insert_recording(&recording).unwrap();
tx.commit().unwrap(); tx.commit().unwrap();
} }
assert_unsorted_eq(db.lock().list_reserved_sample_files().unwrap(), assert_eq!(db.lock().streams_by_id().get(&stream_id).unwrap().next_recording_id, 2);
vec![uuid_to_keep]);
// Queries should return the correct result (with caches update on insert). // Queries should return the correct result (with caches update on insert).
assert_single_recording(&db, stream_id, &recording); assert_single_recording(&db, stream_id, &recording);
@ -2192,7 +2113,7 @@ mod tests {
let db = Database::new(conn, true).unwrap(); let db = Database::new(conn, true).unwrap();
assert_single_recording(&db, stream_id, &recording); assert_single_recording(&db, stream_id, &recording);
// Deleting a recording should succeed, update the min/max times, and re-reserve the uuid. // Deleting a recording should succeed, update the min/max times, and mark it as garbage.
{ {
let mut db = db.lock(); let mut db = db.lock();
let mut v = Vec::new(); let mut v = Vec::new();
@ -2203,32 +2124,31 @@ mod tests {
tx.commit().unwrap(); tx.commit().unwrap();
} }
assert_no_recordings(&db, camera_uuid); assert_no_recordings(&db, camera_uuid);
assert_unsorted_eq(db.lock().list_reserved_sample_files().unwrap(), assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), vec![id]);
vec![uuid_to_use, uuid_to_keep]);
} }
#[test] #[test]
fn test_drop_tx() { fn test_drop_tx() {
testutil::init(); testutil::init();
let conn = setup_conn(); let conn = setup_conn();
conn.execute("insert into garbage values (1, ?)", &[&CompositeId::new(1, 1).0]).unwrap();
let db = Database::new(conn, true).unwrap(); let db = Database::new(conn, true).unwrap();
let mut db = db.lock(); let mut db = db.lock();
{ {
let mut tx = db.tx().unwrap(); let mut tx = db.tx().unwrap();
tx.reserve_sample_file().unwrap(); tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap();
// drop tx without committing. // drop tx without committing.
} }
// The dropped tx should have done nothing. // The dropped tx should have done nothing.
assert_eq!(db.list_reserved_sample_files().unwrap(), &[]); assert_eq!(db.list_garbage(1).unwrap(), &[CompositeId::new(1, 1)]);
// Following transactions should succeed. // Following transactions should succeed.
let uuid;
{ {
let mut tx = db.tx().unwrap(); let mut tx = db.tx().unwrap();
uuid = tx.reserve_sample_file().unwrap(); tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap();
tx.commit().unwrap(); tx.commit().unwrap();
} }
assert_eq!(db.list_reserved_sample_files().unwrap(), &[uuid]); assert_eq!(db.list_garbage(1).unwrap(), &[]);
} }
} }

View File

@ -32,8 +32,9 @@
//! //!
//! This includes opening files for serving, rotating away old files, and saving new files. //! This includes opening files for serving, rotating away old files, and saving new files.
use db; use db::{self, CompositeId};
use error::Error; use error::Error;
use fnv::FnvHashMap;
use libc::{self, c_char}; use libc::{self, c_char};
use protobuf::{self, Message}; use protobuf::{self, Message};
use recording; use recording;
@ -44,11 +45,11 @@ use std::ffi;
use std::fs; use std::fs;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::mem; use std::mem;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
use uuid::Uuid;
/// A sample file directory. Typically one per physical disk drive. /// A sample file directory. Typically one per physical disk drive.
/// ///
@ -81,16 +82,18 @@ impl Drop for Fd {
impl Fd { impl Fd {
/// Opens the given path as a directory. /// Opens the given path as a directory.
pub fn open(path: &str, mkdir: bool) -> Result<Fd, io::Error> { pub fn open(fd: Option<&Fd>, path: &str, mkdir: bool) -> Result<Fd, io::Error> {
let fd = fd.map(|fd| fd.0).unwrap_or(libc::AT_FDCWD);
let cstring = ffi::CString::new(path) let cstring = ffi::CString::new(path)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
if mkdir && unsafe { libc::mkdir(cstring.as_ptr(), 0o700) } != 0 { if mkdir && unsafe { libc::mkdirat(fd, cstring.as_ptr(), 0o700) } != 0 {
let e = io::Error::last_os_error(); let e = io::Error::last_os_error();
if e.kind() != io::ErrorKind::AlreadyExists { if e.kind() != io::ErrorKind::AlreadyExists {
return Err(e.into()); return Err(e.into());
} }
} }
let fd = unsafe { libc::open(cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, 0) }; let fd = unsafe { libc::openat(fd, cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY,
0) };
if fd < 0 { if fd < 0 {
return Err(io::Error::last_os_error().into()); return Err(io::Error::last_os_error().into());
} }
@ -107,14 +110,6 @@ impl Fd {
Ok(fs::File::from_raw_fd(fd)) Ok(fs::File::from_raw_fd(fd))
} }
unsafe fn renameat(&self, from: *const c_char, to: *const c_char) -> Result<(), io::Error> {
let result = libc::renameat(self.0, from, self.0, to);
if result < 0 {
return Err(io::Error::last_os_error())
}
Ok(())
}
/// Locks the directory with the specified `flock` operation. /// Locks the directory with the specified `flock` operation.
pub fn lock(&self, operation: libc::c_int) -> Result<(), io::Error> { pub fn lock(&self, operation: libc::c_int) -> Result<(), io::Error> {
let ret = unsafe { libc::flock(self.0, operation) }; let ret = unsafe { libc::flock(self.0, operation) };
@ -135,6 +130,15 @@ impl Fd {
} }
} }
pub unsafe fn renameat(from_fd: &Fd, from_path: *const c_char,
to_fd: &Fd, to_path: *const c_char) -> Result<(), io::Error> {
let result = libc::renameat(from_fd.0, from_path, to_fd.0, to_path);
if result < 0 {
return Err(io::Error::last_os_error())
}
Ok(())
}
impl SampleFileDir { impl SampleFileDir {
/// Opens the directory using the given metadata. /// Opens the directory using the given metadata.
/// ///
@ -147,7 +151,7 @@ impl SampleFileDir {
s.fd.lock(if read_write { libc::LOCK_EX } else { libc::LOCK_SH } | libc::LOCK_NB)?; s.fd.lock(if read_write { libc::LOCK_EX } else { libc::LOCK_SH } | libc::LOCK_NB)?;
let dir_meta = s.read_meta()?; let dir_meta = s.read_meta()?;
if !SampleFileDir::consistent(db_meta, &dir_meta) { if !SampleFileDir::consistent(db_meta, &dir_meta) {
return Err(Error::new(format!("metadata mismatch. db: {:?} dir: {:?}", return Err(Error::new(format!("metadata mismatch.\ndb: {:#?}\ndir: {:#?}",
db_meta, &dir_meta))); db_meta, &dir_meta)));
} }
if db_meta.in_progress_open.is_some() { if db_meta.in_progress_open.is_some() {
@ -193,19 +197,19 @@ impl SampleFileDir {
} }
fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> { fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> {
let fd = Fd::open(path, create) let fd = Fd::open(None, path, create)
.map_err(|e| Error::new(format!("unable to open sample file dir {}: {}", path, e)))?; .map_err(|e| Error::new(format!("unable to open sample file dir {}: {}", path, e)))?;
Ok(Arc::new(SampleFileDir { Ok(Arc::new(SampleFileDir {
fd, fd,
mutable: Mutex::new(SharedMutableState{ mutable: Mutex::new(SharedMutableState{
next_uuid: None, next_id_by_stream: FnvHashMap::default(),
}), }),
})) }))
} }
/// Opens the given sample file for reading. /// Opens the given sample file for reading.
pub fn open_sample_file(&self, uuid: Uuid) -> Result<fs::File, io::Error> { pub fn open_sample_file(&self, composite_id: CompositeId) -> Result<fs::File, io::Error> {
let p = SampleFileDir::get_rel_pathname(uuid); let p = SampleFileDir::get_rel_pathname(composite_id);
unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) }
} }
@ -246,7 +250,7 @@ impl SampleFileDir {
cause: Some(Box::new(e)), cause: Some(Box::new(e)),
})?; })?;
f.sync_all()?; f.sync_all()?;
unsafe { self.fd.renameat(tmp_path.as_ptr(), final_path.as_ptr())? }; unsafe { renameat(&self.fd, tmp_path.as_ptr(), &self.fd, final_path.as_ptr())? };
self.sync()?; self.sync()?;
Ok(()) Ok(())
} }
@ -258,52 +262,60 @@ impl SampleFileDir {
/// ///
/// The new recording will continue from `prev` if specified; this should be as returned from /// The new recording will continue from `prev` if specified; this should be as returned from
/// a previous `close` call. /// a previous `close` call.
pub fn create_writer<'a>(&self, db: &db::Database, channel: &'a SyncerChannel, pub fn create_writer<'a>(&'a self, db: &db::Database, channel: &'a SyncerChannel,
prev: Option<PreviousWriter>, camera_id: i32, prev: Option<PreviousWriter>, stream_id: i32,
video_sample_entry_id: i32) video_sample_entry_id: i32)
-> Result<Writer<'a>, Error> { -> Result<Writer<'a>, Error> {
// Grab the next uuid. Typically one is cached—a sync has usually completed since the last // Grab the next id. The dir itself will typically have an id (possibly one ahead of what's
// writer was created, and syncs ensure `next_uuid` is filled while performing their // stored in the database), but not on the first attempt for a stream.
// transaction. But if not, perform an extra database transaction to reserve a new one. use std::collections::hash_map::Entry;
let uuid = match self.mutable.lock().unwrap().next_uuid.take() { let recording_id;
Some(u) => u, match self.mutable.lock().unwrap().next_id_by_stream.entry(stream_id) {
None => { Entry::Occupied(mut e) => {
info!("Committing extra transaction because there's no cached uuid"); let v = e.get_mut();
recording_id = *v;
*v += 1;
},
Entry::Vacant(e) => {
let mut l = db.lock(); let mut l = db.lock();
let mut tx = l.tx()?; recording_id = l.streams_by_id().get(&stream_id).unwrap().next_recording_id;
let u = tx.reserve_sample_file()?; e.insert(recording_id + 1);
tx.commit()?;
u
}, },
}; };
let p = SampleFileDir::get_rel_pathname(uuid); let id = CompositeId::new(stream_id, recording_id);
let p = SampleFileDir::get_rel_pathname(id);
let f = match unsafe { self.fd.openat(p.as_ptr(), let f = match unsafe { self.fd.openat(p.as_ptr(),
libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT,
0o600) } { 0o600) } {
Ok(f) => f, Ok(f) => f,
Err(e) => { Err(e) => {
self.mutable.lock().unwrap().next_uuid = Some(uuid); // Put the id back to try again later.
let mut l = self.mutable.lock().unwrap();
let v = l.next_id_by_stream.get_mut(&stream_id).unwrap();
assert_eq!(*v, recording_id + 1);
*v -= 1;
return Err(e.into()); return Err(e.into());
}, },
}; };
Writer::open(f, uuid, prev, camera_id, video_sample_entry_id, channel) Writer::open(f, id, prev, video_sample_entry_id, channel)
} }
pub fn statfs(&self) -> Result<libc::statvfs, io::Error> { self.fd.statfs() } pub fn statfs(&self) -> Result<libc::statvfs, io::Error> { self.fd.statfs() }
/// Gets a pathname for a sample file suitable for passing to open or unlink. /// Gets a pathname for a sample file suitable for passing to open or unlink.
fn get_rel_pathname(uuid: Uuid) -> [libc::c_char; 37] { fn get_rel_pathname(id: CompositeId) -> [libc::c_char; 17] {
let mut buf = [0u8; 37]; let mut buf = [0u8; 17];
write!(&mut buf[..36], "{}", uuid.hyphenated()).expect("can't format uuid to pathname buf"); write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf");
// libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64). // libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64).
unsafe { mem::transmute::<[u8; 37], [libc::c_char; 37]>(buf) } unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) }
} }
/// Unlinks the given sample file within this directory. /// Unlinks the given sample file within this directory.
fn unlink(fd: &Fd, uuid: Uuid) -> Result<(), io::Error> { fn unlink(fd: &Fd, id: CompositeId) -> Result<(), io::Error> {
let p = SampleFileDir::get_rel_pathname(uuid); let p = SampleFileDir::get_rel_pathname(id);
let res = unsafe { libc::unlinkat(fd.0, p.as_ptr(), 0) }; let res = unsafe { libc::unlinkat(fd.0, p.as_ptr(), 0) };
if res < 0 { if res < 0 {
return Err(io::Error::last_os_error()) return Err(io::Error::last_os_error())
@ -316,7 +328,7 @@ impl SampleFileDir {
let res = unsafe { libc::fsync(self.fd.0) }; let res = unsafe { libc::fsync(self.fd.0) };
if res < 0 { if res < 0 {
return Err(io::Error::last_os_error()) return Err(io::Error::last_os_error())
}; }
Ok(()) Ok(())
} }
} }
@ -324,13 +336,13 @@ impl SampleFileDir {
/// State shared between users of the `SampleFileDirectory` struct and the syncer. /// State shared between users of the `SampleFileDirectory` struct and the syncer.
#[derive(Debug)] #[derive(Debug)]
struct SharedMutableState { struct SharedMutableState {
next_uuid: Option<Uuid>, next_id_by_stream: FnvHashMap<i32, i32>,
} }
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
enum SyncerCommand { enum SyncerCommand {
AsyncSaveRecording(db::RecordingToInsert, fs::File), AsyncSaveRecording(db::RecordingToInsert, fs::File),
AsyncAbandonRecording(Uuid), AsyncAbandonRecording(CompositeId),
#[cfg(test)] #[cfg(test)]
Flush(mpsc::SyncSender<()>), Flush(mpsc::SyncSender<()>),
@ -345,8 +357,18 @@ pub struct SyncerChannel(mpsc::Sender<SyncerCommand>);
struct Syncer { struct Syncer {
dir: Arc<SampleFileDir>, dir: Arc<SampleFileDir>,
db: Arc<db::Database>, db: Arc<db::Database>,
to_unlink: Vec<Uuid>,
to_mark_deleted: Vec<Uuid>, /// Files to be unlinked then immediately forgotten about. They are `>= next_recording_id` for
/// their stream, `next_recording_id` won't be advanced without a sync of the directory, and
/// extraneous files `>= next_recording_id` are unlinked on startup, so this should be
/// sufficient.
to_abandon: Vec<CompositeId>,
/// Files to be unlinked then removed from the garbage table.
to_unlink: Vec<CompositeId>,
/// Files to be removed from the garbage table.
to_mark_deleted: Vec<CompositeId>,
} }
/// Starts a syncer for the given sample file directory. /// Starts a syncer for the given sample file directory.
@ -360,19 +382,16 @@ struct Syncer {
/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and /// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and
/// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be /// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be
/// removed and then the handle joined to allow all recordings to be persisted. /// removed and then the handle joined to allow all recordings to be persisted.
pub fn start_syncer(dir: Arc<SampleFileDir>, db: Arc<db::Database>) pub fn start_syncer(db: Arc<db::Database>, dir_id: i32)
-> Result<(SyncerChannel, thread::JoinHandle<()>), Error> { -> Result<(SyncerChannel, thread::JoinHandle<()>), Error> {
let to_unlink = db.lock().list_reserved_sample_files()?; let db2 = db.clone();
let (snd, rcv) = mpsc::channel(); let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?;
let mut syncer = Syncer {
dir,
db,
to_unlink,
to_mark_deleted: Vec::new(),
};
syncer.initial_rotation()?; syncer.initial_rotation()?;
let (snd, rcv) = mpsc::channel();
Ok((SyncerChannel(snd), Ok((SyncerChannel(snd),
thread::Builder::new().name("syncer".into()).spawn(move || syncer.run(rcv)).unwrap())) thread::Builder::new()
.name(format!("sync-{}", path))
.spawn(move || syncer.run(rcv)).unwrap()))
} }
pub struct NewLimit { pub struct NewLimit {
@ -383,15 +402,10 @@ pub struct NewLimit {
/// Deletes recordings if necessary to fit within the given new `retain_bytes` limit. /// Deletes recordings if necessary to fit within the given new `retain_bytes` limit.
/// Note this doesn't change the limit in the database; it only deletes files. /// Note this doesn't change the limit in the database; it only deletes files.
/// Pass a limit of 0 to delete all recordings associated with a camera. /// Pass a limit of 0 to delete all recordings associated with a camera.
pub fn lower_retention(dir: Arc<SampleFileDir>, db: Arc<db::Database>, limits: &[NewLimit]) pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
-> Result<(), Error> { -> Result<(), Error> {
let to_unlink = db.lock().list_reserved_sample_files()?; let db2 = db.clone();
let mut syncer = Syncer { let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
dir,
db,
to_unlink,
to_mark_deleted: Vec::new(),
};
syncer.do_rotation(|db| { syncer.do_rotation(|db| {
let mut to_delete = Vec::new(); let mut to_delete = Vec::new();
for l in limits { for l in limits {
@ -441,8 +455,8 @@ impl SyncerChannel {
self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap(); self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap();
} }
fn async_abandon_recording(&self, uuid: Uuid) { fn async_abandon_recording(&self, id: CompositeId) {
self.0.send(SyncerCommand::AsyncAbandonRecording(uuid)).unwrap(); self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap();
} }
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete. /// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
@ -455,6 +469,60 @@ impl SyncerChannel {
} }
impl Syncer { impl Syncer {
fn new(l: &db::LockedDatabase, db: Arc<db::Database>, dir_id: i32)
-> Result<(Self, String), Error> {
let d = l.sample_file_dirs_by_id()
.get(&dir_id)
.ok_or_else(|| Error::new(format!("no dir {}", dir_id)))?;
let dir = d.get()?;
let to_unlink = l.list_garbage(dir_id)?;
// Get files to abandon.
// First, get a list of the streams in question.
let streams_to_next: FnvHashMap<_, _> =
l.streams_by_id()
.iter()
.filter_map(|(&k, v)| {
if v.sample_file_dir_id == Some(dir_id) {
Some((k, v.next_recording_id))
} else {
None
}
})
.collect();
let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?;
Ok((Syncer {
dir,
db,
to_abandon,
to_unlink,
to_mark_deleted: Vec::new(),
}, d.path.clone()))
}
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
/// on opening.
fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap<i32, i32>)
-> Result<Vec<CompositeId>, Error> {
let mut v = Vec::new();
for e in ::std::fs::read_dir(path)? {
let e = e?;
let id = match parse_id(e.file_name().as_bytes()) {
Ok(i) => i,
Err(_) => continue,
};
let next = match streams_to_next.get(&id.stream()) {
Some(n) => *n,
None => continue, // unknown stream.
};
if id.recording() >= next {
v.push(id);
}
}
Ok(v)
}
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand>) { fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand>) {
loop { loop {
match cmds.recv() { match cmds.recv() {
@ -467,7 +535,7 @@ impl Syncer {
} }
} }
/// Rotates files for all streams and deletes stale reserved uuids from previous runs. /// Rotates files for all streams and deletes stale files from previous runs.
fn initial_rotation(&mut self) -> Result<(), Error> { fn initial_rotation(&mut self) -> Result<(), Error> {
self.do_rotation(|db| { self.do_rotation(|db| {
let mut to_delete = Vec::new(); let mut to_delete = Vec::new();
@ -489,7 +557,7 @@ impl Syncer {
to_delete to_delete
}; };
for row in to_delete { for row in to_delete {
self.to_unlink.push(row.uuid); self.to_unlink.push(row.id);
} }
self.try_unlink(); self.try_unlink();
if !self.to_unlink.is_empty() { if !self.to_unlink.is_empty() {
@ -512,15 +580,14 @@ impl Syncer {
/// so that there can be only one dir sync and database transaction per save. /// so that there can be only one dir sync and database transaction per save.
fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) { fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) {
if let Err(e) = self.save_helper(&recording, f) { if let Err(e) = self.save_helper(&recording, f) {
error!("camera {}: will discard recording {} due to error while saving: {}", error!("will discard recording {} due to error while saving: {}", recording.id, e);
recording.stream_id, recording.sample_file_uuid, e); self.abandon(recording.id);
self.to_unlink.push(recording.sample_file_uuid);
return; return;
} }
} }
fn abandon(&mut self, uuid: Uuid) { fn abandon(&mut self, id: CompositeId) {
self.to_unlink.push(uuid); self.to_abandon.push(id);
self.try_unlink(); self.try_unlink();
} }
@ -532,57 +599,58 @@ impl Syncer {
if !self.to_unlink.is_empty() { if !self.to_unlink.is_empty() {
return Err(Error::new(format!("failed to unlink {} files.", self.to_unlink.len()))); return Err(Error::new(format!("failed to unlink {} files.", self.to_unlink.len())));
} }
// XXX: if these calls fail, any other writes are likely to fail as well.
f.sync_all()?; f.sync_all()?;
self.dir.sync()?; self.dir.sync()?;
let mut to_delete = Vec::new(); let mut to_delete = Vec::new();
let mut l = self.dir.mutable.lock().unwrap();
let mut db = self.db.lock(); let mut db = self.db.lock();
let mut new_next_uuid = l.next_uuid;
{ {
let stream_id = recording.id.stream();
let stream = let stream =
db.streams_by_id().get(&recording.stream_id) db.streams_by_id().get(&stream_id)
.ok_or_else(|| Error::new(format!("no such stream {}", recording.stream_id)))?; .ok_or_else(|| Error::new(format!("no such stream {}", stream_id)))?;
get_rows_to_delete(&db, recording.stream_id, stream, get_rows_to_delete(&db, stream_id, stream,
recording.sample_file_bytes as i64, &mut to_delete)?; recording.sample_file_bytes as i64, &mut to_delete)?;
} }
let mut tx = db.tx()?; let mut tx = db.tx()?;
tx.mark_sample_files_deleted(&self.to_mark_deleted)?; tx.mark_sample_files_deleted(&self.to_mark_deleted)?;
tx.delete_recordings(&to_delete)?; tx.delete_recordings(&to_delete)?;
if new_next_uuid.is_none() {
new_next_uuid = Some(tx.reserve_sample_file()?);
}
tx.insert_recording(recording)?; tx.insert_recording(recording)?;
tx.commit()?; tx.commit()?;
l.next_uuid = new_next_uuid;
self.to_mark_deleted.clear(); self.to_mark_deleted.clear();
self.to_unlink.extend(to_delete.iter().map(|row| row.uuid)); self.to_unlink.extend(to_delete.iter().map(|row| row.id));
self.to_unlink.extend_from_slice(&self.to_abandon);
self.to_abandon.clear();
Ok(()) Ok(())
} }
/// Tries to unlink all the uuids in `self.to_unlink`. Any which can't be unlinked will /// Tries to unlink all the files in `self.to_unlink` and `self.to_abandon`.
/// be retained in the vec. /// Any which can't be unlinked will be retained in the vec.
fn try_unlink(&mut self) { fn try_unlink(&mut self) {
let to_mark_deleted = &mut self.to_mark_deleted; let to_mark_deleted = &mut self.to_mark_deleted;
let fd = &self.dir.fd; let fd = &self.dir.fd;
self.to_unlink.retain(|uuid| { for &mut (ref mut v, mark_deleted) in &mut [(&mut self.to_unlink, true),
if let Err(e) = SampleFileDir::unlink(fd, *uuid) { (&mut self.to_abandon, false)] {
v.retain(|&id| {
if let Err(e) = SampleFileDir::unlink(fd, id) {
if e.kind() == io::ErrorKind::NotFound { if e.kind() == io::ErrorKind::NotFound {
warn!("dir: Sample file {} already deleted!", uuid.hyphenated()); warn!("dir: recording {} already deleted!", id);
to_mark_deleted.push(*uuid);
false
} else { } else {
warn!("dir: Unable to unlink {}: {}", uuid.hyphenated(), e); warn!("dir: Unable to unlink {}: {}", id, e);
true return true;
}
}
if mark_deleted {
to_mark_deleted.push(id);
} }
} else {
to_mark_deleted.push(*uuid);
false false
}
}); });
} }
} }
}
/// Single-use struct to write a single recording to disk and commit its metadata to the database. /// Single-use struct to write a single recording to disk and commit its metadata to the database.
/// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the /// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the
@ -598,7 +666,7 @@ struct InnerWriter<'a> {
syncer_channel: &'a SyncerChannel, syncer_channel: &'a SyncerChannel,
f: fs::File, f: fs::File,
index: recording::SampleIndexEncoder, index: recording::SampleIndexEncoder,
uuid: Uuid, id: CompositeId,
corrupt: bool, corrupt: bool,
hasher: hash::Hasher, hasher: hash::Hasher,
@ -614,7 +682,6 @@ struct InnerWriter<'a> {
adjuster: ClockAdjuster, adjuster: ClockAdjuster,
stream_id: i32,
video_sample_entry_id: i32, video_sample_entry_id: i32,
run_offset: i32, run_offset: i32,
@ -691,19 +758,18 @@ pub struct PreviousWriter {
impl<'a> Writer<'a> { impl<'a> Writer<'a> {
/// Opens the writer; for use by `SampleFileDir` (which should supply `f`). /// Opens the writer; for use by `SampleFileDir` (which should supply `f`).
fn open(f: fs::File, uuid: Uuid, prev: Option<PreviousWriter>, stream_id: i32, fn open(f: fs::File, id: CompositeId, prev: Option<PreviousWriter>,
video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result<Self, Error> { video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result<Self, Error> {
Ok(Writer(Some(InnerWriter { Ok(Writer(Some(InnerWriter {
syncer_channel, syncer_channel,
f, f,
index: recording::SampleIndexEncoder::new(), index: recording::SampleIndexEncoder::new(),
uuid, id,
corrupt: false, corrupt: false,
hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?,
prev_end: prev.map(|p| p.end_time), prev_end: prev.map(|p| p.end_time),
local_start: recording::Time(i64::max_value()), local_start: recording::Time(i64::max_value()),
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
stream_id,
video_sample_entry_id, video_sample_entry_id,
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
unflushed_sample: None, unflushed_sample: None,
@ -734,7 +800,7 @@ impl<'a> Writer<'a> {
// Partially written packet. Truncate if possible. // Partially written packet. Truncate if possible.
if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) { if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) {
error!("After write to {} failed with {}, truncate failed with {}; \ error!("After write to {} failed with {}, truncate failed with {}; \
sample file is corrupt.", w.uuid.hyphenated(), e, e2); sample file is corrupt.", w.id, e, e2);
w.corrupt = true; w.corrupt = true;
} }
} }
@ -768,8 +834,8 @@ impl<'a> InnerWriter<'a> {
fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> { fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> {
if self.corrupt { if self.corrupt {
self.syncer_channel.async_abandon_recording(self.uuid); self.syncer_channel.async_abandon_recording(self.id);
return Err(Error::new(format!("recording {} is corrupt", self.uuid))); return Err(Error::new(format!("recording {} is corrupt", self.id)));
} }
let unflushed = let unflushed =
self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?; self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?;
@ -787,14 +853,13 @@ impl<'a> InnerWriter<'a> {
else { 0 }; else { 0 };
let local_start_delta = self.local_start - start; let local_start_delta = self.local_start - start;
let recording = db::RecordingToInsert{ let recording = db::RecordingToInsert{
stream_id: self.stream_id, id: self.id,
sample_file_bytes: self.index.sample_file_bytes, sample_file_bytes: self.index.sample_file_bytes,
time: start .. end, time: start .. end,
local_time_delta: local_start_delta, local_time_delta: local_start_delta,
video_samples: self.index.video_samples, video_samples: self.index.video_samples,
video_sync_samples: self.index.video_sync_samples, video_sync_samples: self.index.video_sync_samples,
video_sample_entry_id: self.video_sample_entry_id, video_sample_entry_id: self.video_sample_entry_id,
sample_file_uuid: self.uuid,
video_index: self.index.video_index, video_index: self.index.video_index,
sample_file_sha1: sha1_bytes, sample_file_sha1: sha1_bytes,
run_offset: self.run_offset, run_offset: self.run_offset,
@ -820,6 +885,24 @@ impl<'a> Drop for Writer<'a> {
} }
} }
/// Parse a composite id filename.
///
/// These are exactly 16 bytes, lowercase hex.
fn parse_id(id: &[u8]) -> Result<CompositeId, ()> {
if id.len() != 16 {
return Err(());
}
let mut v: u64 = 0;
for i in 0..16 {
v = (v << 4) | match id[i] {
b @ b'0'...b'9' => b - b'0',
b @ b'a'...b'f' => b - b'a' + 10,
_ => return Err(()),
} as u64;
}
Ok(CompositeId(v as i64))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::ClockAdjuster; use super::ClockAdjuster;
@ -883,4 +966,15 @@ mod tests {
assert!(total == expected || total == expected + 1, "total={} vs expected={}", assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected); total, expected);
} }
#[test]
fn parse_id() {
use super::parse_id;
assert_eq!(parse_id(b"0000000000000000").unwrap().0, 0);
assert_eq!(parse_id(b"0000000100000002").unwrap().0, 0x0000000100000002);
parse_id(b"").unwrap_err();
parse_id(b"meta").unwrap_err();
parse_id(b"0").unwrap_err();
parse_id(b"000000010000000x").unwrap_err();
}
} }

View File

@ -381,8 +381,7 @@ impl Segment {
self.index_once.call_once(|| { self.index_once.call_once(|| {
let index = unsafe { &mut *self.index.get() }; let index = unsafe { &mut *self.index.get() };
*index = db.lock() *index = db.lock()
.with_recording_playback(self.s.stream_id, self.s.recording_id, .with_recording_playback(self.s.id, |playback| self.build_index(playback))
|playback| self.build_index(playback))
.map_err(|e| { error!("Unable to build index for segment: {:?}", e); }); .map_err(|e| { error!("Unable to build index for segment: {:?}", e); });
}); });
let index: &'a _ = unsafe { &*self.index.get() }; let index: &'a _ = unsafe { &*self.index.get() };
@ -629,8 +628,7 @@ impl Slice {
} }
let truns = let truns =
mp4.0.db.lock() mp4.0.db.lock()
.with_recording_playback(s.s.stream_id, s.s.recording_id, .with_recording_playback(s.s.id, |playback| s.truns(playback, pos, len))
|playback| s.truns(playback, pos, len))
.map_err(|e| { Error::new(format!("Unable to build index for segment: {:?}", e)) })?; .map_err(|e| { Error::new(format!("Unable to build index for segment: {:?}", e)) })?;
let truns = ARefs::new(truns); let truns = ARefs::new(truns);
Ok(truns.map(|t| &t[r.start as usize .. r.end as usize])) Ok(truns.map(|t| &t[r.start as usize .. r.end as usize]))
@ -761,8 +759,8 @@ impl FileBuilder {
if let Some(prev) = self.segments.last() { if let Some(prev) = self.segments.last() {
if prev.s.have_trailing_zero() { if prev.s.have_trailing_zero() {
return Err(Error::new(format!( return Err(Error::new(format!(
"unable to append recording {}/{} after recording {}/{} with trailing zero", "unable to append recording {} after recording {} with trailing zero",
row.stream_id, row.id, prev.s.stream_id, prev.s.recording_id))); row.id, prev.s.id)));
} }
} }
let s = Segment::new(db, &row, rel_range_90k, self.next_frame_num)?; let s = Segment::new(db, &row, rel_range_90k, self.next_frame_num)?;
@ -812,8 +810,7 @@ impl FileBuilder {
// Update the etag to reflect this segment. // Update the etag to reflect this segment.
let mut data = [0_u8; 24]; let mut data = [0_u8; 24];
let mut cursor = io::Cursor::new(&mut data[..]); let mut cursor = io::Cursor::new(&mut data[..]);
cursor.write_i32::<BigEndian>(s.s.stream_id)?; cursor.write_i64::<BigEndian>(s.s.id.0)?;
cursor.write_i32::<BigEndian>(s.s.recording_id)?;
cursor.write_i64::<BigEndian>(s.s.start.0)?; cursor.write_i64::<BigEndian>(s.s.start.0)?;
cursor.write_i32::<BigEndian>(d.start)?; cursor.write_i32::<BigEndian>(d.start)?;
cursor.write_i32::<BigEndian>(d.end)?; cursor.write_i32::<BigEndian>(d.end)?;
@ -1452,16 +1449,10 @@ impl FileInner {
/// happen because nothing should be touching Moonfire NVR's files but itself. /// happen because nothing should be touching Moonfire NVR's files but itself.
fn get_video_sample_data(&self, i: usize, r: Range<u64>) -> Result<Chunk, Error> { fn get_video_sample_data(&self, i: usize, r: Range<u64>) -> Result<Chunk, Error> {
let s = &self.segments[i]; let s = &self.segments[i];
let uuid = {
let l = self.db.lock();
l.with_recording_playback(s.s.stream_id, s.s.recording_id,
|p| Ok(p.sample_file_uuid))?
};
let f = self.dirs_by_stream_id let f = self.dirs_by_stream_id
.get(&s.s.stream_id) .get(&s.s.id.stream())
.ok_or_else(|| Error::new(format!("{}/{}: stream not found", .ok_or_else(|| Error::new(format!("{}: stream not found", s.s.id)))?
s.s.stream_id, s.s.recording_id)))? .open_sample_file(s.s.id)?;
.open_sample_file(uuid)?;
let start = s.s.sample_file_range().start + r.start; let start = s.s.sample_file_range().start + r.start;
let mmap = Box::new(unsafe { let mmap = Box::new(unsafe {
memmap::MmapOptions::new() memmap::MmapOptions::new()
@ -2271,7 +2262,7 @@ mod bench {
let rel_range_90k = 0 .. row.duration_90k; let rel_range_90k = 0 .. row.duration_90k;
super::Segment::new(&db, &row, rel_range_90k, 1).unwrap() super::Segment::new(&db, &row, rel_range_90k, 1).unwrap()
}; };
db.with_recording_playback(segment.s.stream_id, segment.s.recording_id, |playback| { db.with_recording_playback(segment.s.id, |playback| {
let v = segment.build_index(playback).unwrap(); // warm. let v = segment.build_index(playback).unwrap(); // warm.
b.bytes = v.len() as u64; // define the benchmark performance in terms of output bytes. b.bytes = v.len() as u64; // define the benchmark performance in terms of output bytes.
b.iter(|| segment.build_index(playback).unwrap()); b.iter(|| segment.build_index(playback).unwrap());

View File

@ -354,8 +354,7 @@ impl SampleIndexEncoder {
/// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4. /// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4.
#[derive(Debug)] #[derive(Debug)]
pub struct Segment { pub struct Segment {
pub stream_id: i32, pub id: db::CompositeId,
pub recording_id: i32,
pub start: Time, pub start: Time,
/// An iterator positioned at the beginning of the segment, or `None`. Most segments are /// An iterator positioned at the beginning of the segment, or `None`. Most segments are
@ -382,8 +381,7 @@ impl Segment {
recording: &db::ListRecordingsRow, recording: &db::ListRecordingsRow,
desired_range_90k: Range<i32>) -> Result<Segment, Error> { desired_range_90k: Range<i32>) -> Result<Segment, Error> {
let mut self_ = Segment { let mut self_ = Segment {
stream_id: recording.stream_id, id: recording.id,
recording_id: recording.id,
start: recording.start, start: recording.start,
begin: None, begin: None,
file_end: recording.sample_file_bytes, file_end: recording.sample_file_bytes,
@ -413,7 +411,7 @@ impl Segment {
// Slow path. Need to iterate through the index. // Slow path. Need to iterate through the index.
trace!("recording::Segment::new slow path, desired_range_90k={:?}, recording={:#?}", trace!("recording::Segment::new slow path, desired_range_90k={:?}, recording={:#?}",
self_.desired_range_90k, recording); self_.desired_range_90k, recording);
db.with_recording_playback(self_.stream_id, self_.recording_id, |playback| { db.with_recording_playback(self_.id, |playback| {
let mut begin = Box::new(SampleIndexIterator::new()); let mut begin = Box::new(SampleIndexIterator::new());
let data = &(&playback).video_index; let data = &(&playback).video_index;
let mut it = SampleIndexIterator::new(); let mut it = SampleIndexIterator::new();
@ -480,8 +478,8 @@ impl Segment {
/// Must be called without the database lock held; retrieves video index from the cache. /// Must be called without the database lock held; retrieves video index from the cache.
pub fn foreach<F>(&self, playback: &db::RecordingPlayback, mut f: F) -> Result<(), Error> pub fn foreach<F>(&self, playback: &db::RecordingPlayback, mut f: F) -> Result<(), Error>
where F: FnMut(&SampleIndexIterator) -> Result<(), Error> { where F: FnMut(&SampleIndexIterator) -> Result<(), Error> {
trace!("foreach on recording {}/{}: {} frames, actual_start_90k: {}", trace!("foreach on recording {}: {} frames, actual_start_90k: {}",
self.stream_id, self.recording_id, self.frames, self.actual_start_90k()); self.id, self.frames, self.actual_start_90k());
let data = &(&playback).video_index; let data = &(&playback).video_index;
let mut it = match self.begin { let mut it = match self.begin {
Some(ref b) => **b, Some(ref b) => **b,
@ -489,28 +487,26 @@ impl Segment {
}; };
if it.uninitialized() { if it.uninitialized() {
if !it.next(data)? { if !it.next(data)? {
return Err(Error::new(format!("recording {}/{}: no frames", return Err(Error::new(format!("recording {}: no frames", self.id)));
self.stream_id, self.recording_id)));
} }
if !it.is_key() { if !it.is_key() {
return Err(Error::new(format!("recording {}/{}: doesn't start with key frame", return Err(Error::new(format!("recording {}: doesn't start with key frame",
self.stream_id, self.recording_id))); self.id)));
} }
} }
let mut have_frame = true; let mut have_frame = true;
let mut key_frame = 0; let mut key_frame = 0;
for i in 0 .. self.frames { for i in 0 .. self.frames {
if !have_frame { if !have_frame {
return Err(Error::new(format!("recording {}/{}: expected {} frames, found only {}", return Err(Error::new(format!("recording {}: expected {} frames, found only {}",
self.stream_id, self.recording_id, self.frames, self.id, self.frames, i+1)));
i+1)));
} }
if it.is_key() { if it.is_key() {
key_frame += 1; key_frame += 1;
if key_frame > self.key_frames { if key_frame > self.key_frames {
return Err(Error::new(format!( return Err(Error::new(format!(
"recording {}/{}: more than expected {} key frames", "recording {}: more than expected {} key frames",
self.stream_id, self.recording_id, self.key_frames))); self.id, self.key_frames)));
} }
} }
@ -521,9 +517,8 @@ impl Segment {
have_frame = try!(it.next(data)); have_frame = try!(it.next(data));
} }
if key_frame < self.key_frames { if key_frame < self.key_frames {
return Err(Error::new(format!("recording {}/{}: expected {} key frames, found only {}", return Err(Error::new(format!("recording {}: expected {} key frames, found only {}",
self.stream_id, self.recording_id, self.key_frames, self.id, self.key_frames, key_frame)));
key_frame)));
} }
Ok(()) Ok(())
} }
@ -656,7 +651,7 @@ mod tests {
fn get_frames<F, T>(db: &db::Database, segment: &Segment, f: F) -> Vec<T> fn get_frames<F, T>(db: &db::Database, segment: &Segment, f: F) -> Vec<T>
where F: Fn(&SampleIndexIterator) -> T { where F: Fn(&SampleIndexIterator) -> T {
let mut v = Vec::new(); let mut v = Vec::new();
db.lock().with_recording_playback(segment.stream_id, segment.recording_id, |playback| { db.lock().with_recording_playback(segment.id, |playback| {
segment.foreach(playback, |it| { v.push(f(it)); Ok(()) }) segment.foreach(playback, |it| { v.push(f(it)); Ok(()) })
}).unwrap(); }).unwrap();
v v

View File

@ -197,10 +197,6 @@ create table recording_playback (
-- See description on recording table. -- See description on recording table.
composite_id integer primary key references recording (composite_id), composite_id integer primary key references recording (composite_id),
-- The binary representation of the sample file's uuid. The canonical text
-- representation of this uuid is the filename within the sample file dir.
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
-- The sha1 hash of the contents of the sample file. -- The sha1 hash of the contents of the sample file.
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
@ -208,12 +204,21 @@ create table recording_playback (
video_index blob not null check (length(video_index) > 0) video_index blob not null check (length(video_index) > 0)
); );
-- Files in the sample file directory which may be present but should simply be -- Files which are to be deleted (may or may not still exist).
-- discarded on startup. (Recordings which were never completed or have been -- Note that besides these files, for each stream, any recordings >= its
-- marked for completion.) -- next_recording_id should be discarded on startup.
create table reserved_sample_files ( create table garbage (
uuid blob primary key check (length(uuid) = 16), -- This is _mostly_ redundant with composite_id, which contains the stream
state integer not null -- 0 (writing) or 1 (deleted) -- id and thus a linkage to the sample file directory. Listing it here
-- explicitly means that streams can be deleted without losing the
-- association of garbage to directory.
sample_file_dir_id integer not null references sample_file_dir (id),
-- See description on recording table.
composite_id integer not null,
-- Organize the table first by directory, as that's how it will be queried.
primary key (sample_file_dir_id, composite_id)
) without rowid; ) without rowid;
-- A concrete box derived from a ISO/IEC 14496-12 section 8.5.2 -- A concrete box derived from a ISO/IEC 14496-12 section 8.5.2
@ -238,4 +243,4 @@ create table video_sample_entry (
); );
insert into version (id, unix_time, notes) insert into version (id, unix_time, notes)
values (2, cast(strftime('%s', 'now') as int), 'db creation'); values (3, cast(strftime('%s', 'now') as int), 'db creation');

View File

@ -201,7 +201,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use clock::{self, Clocks}; use clock::{self, Clocks};
use db; use db::{self, CompositeId};
use error::Error; use error::Error;
use h264; use h264;
use moonfire_ffmpeg; use moonfire_ffmpeg;
@ -314,8 +314,8 @@ mod tests {
is_key: bool, is_key: bool,
} }
fn get_frames(db: &db::LockedDatabase, camera_id: i32, recording_id: i32) -> Vec<Frame> { fn get_frames(db: &db::LockedDatabase, id: CompositeId) -> Vec<Frame> {
db.with_recording_playback(camera_id, recording_id, |rec| { db.with_recording_playback(id, |rec| {
let mut it = recording::SampleIndexIterator::new(); let mut it = recording::SampleIndexIterator::new();
let mut frames = Vec::new(); let mut frames = Vec::new();
while it.next(&rec.video_index).unwrap() { while it.next(&rec.video_index).unwrap() {
@ -371,7 +371,7 @@ mod tests {
// 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later: // 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later:
// * the first rotation is always skipped // * the first rotation is always skipped
// * the second rotation is deferred until a key frame. // * the second rotation is deferred until a key frame.
assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 1), &[ assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 1)), &[
Frame{start_90k: 0, duration_90k: 90379, is_key: true}, Frame{start_90k: 0, duration_90k: 90379, is_key: true},
Frame{start_90k: 90379, duration_90k: 89884, is_key: false}, Frame{start_90k: 90379, duration_90k: 89884, is_key: false},
Frame{start_90k: 180263, duration_90k: 89749, is_key: false}, Frame{start_90k: 180263, duration_90k: 89749, is_key: false},
@ -381,7 +381,7 @@ mod tests {
Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001... Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001...
Frame{start_90k: 630036, duration_90k: 89958, is_key: false}, Frame{start_90k: 630036, duration_90k: 89958, is_key: false},
]); ]);
assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 2), &[ assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 2)), &[
Frame{start_90k: 0, duration_90k: 90011, is_key: true}, Frame{start_90k: 0, duration_90k: 90011, is_key: true},
Frame{start_90k: 90011, duration_90k: 0, is_key: false}, Frame{start_90k: 90011, duration_90k: 0, is_key: false},
]); ]);
@ -391,10 +391,10 @@ mod tests {
Ok(()) Ok(())
}).unwrap(); }).unwrap();
assert_eq!(2, recordings.len()); assert_eq!(2, recordings.len());
assert_eq!(1, recordings[0].id); assert_eq!(1, recordings[0].id.recording());
assert_eq!(recording::Time(128700575999999), recordings[0].start); assert_eq!(recording::Time(128700575999999), recordings[0].start);
assert_eq!(0, recordings[0].flags); assert_eq!(0, recordings[0].flags);
assert_eq!(2, recordings[1].id); assert_eq!(2, recordings[1].id.recording());
assert_eq!(recording::Time(128700576719993), recordings[1].start); assert_eq!(recording::Time(128700576719993), recordings[1].start);
assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags); assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags);
} }

View File

@ -112,7 +112,8 @@ impl TestDb {
} }
let mut dirs_by_stream_id = FnvHashMap::default(); let mut dirs_by_stream_id = FnvHashMap::default();
dirs_by_stream_id.insert(TEST_STREAM_ID, dir.clone()); dirs_by_stream_id.insert(TEST_STREAM_ID, dir.clone());
let (syncer_channel, syncer_join) = dir::start_syncer(dir, db.clone()).unwrap(); let (syncer_channel, syncer_join) =
dir::start_syncer(db.clone(), sample_file_dir_id).unwrap();
TestDb { TestDb {
db, db,
dirs_by_stream_id: Arc::new(dirs_by_stream_id), dirs_by_stream_id: Arc::new(dirs_by_stream_id),
@ -128,13 +129,12 @@ impl TestDb {
let mut db = self.db.lock(); let mut db = self.db.lock();
let video_sample_entry_id = db.insert_video_sample_entry( let video_sample_entry_id = db.insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
let row_id; let next = db.streams_by_id().get(&TEST_STREAM_ID).unwrap().next_recording_id;
{ {
let mut tx = db.tx().unwrap(); let mut tx = db.tx().unwrap();
tx.bypass_reservation_for_testing = true;
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
row_id = tx.insert_recording(&db::RecordingToInsert{ tx.insert_recording(&db::RecordingToInsert {
stream_id: TEST_STREAM_ID, id: db::CompositeId::new(TEST_STREAM_ID, next),
sample_file_bytes: encoder.sample_file_bytes, sample_file_bytes: encoder.sample_file_bytes,
time: START_TIME .. time: START_TIME ..
START_TIME + recording::Duration(encoder.total_duration_90k as i64), START_TIME + recording::Duration(encoder.total_duration_90k as i64),
@ -142,16 +142,15 @@ impl TestDb {
video_samples: encoder.video_samples, video_samples: encoder.video_samples,
video_sync_samples: encoder.video_sync_samples, video_sync_samples: encoder.video_sync_samples,
video_sample_entry_id: video_sample_entry_id, video_sample_entry_id: video_sample_entry_id,
sample_file_uuid: Uuid::nil(),
video_index: encoder.video_index, video_index: encoder.video_index,
sample_file_sha1: [0u8; 20], sample_file_sha1: [0u8; 20],
run_offset: 0, // TODO run_offset: 0,
flags: 0, // TODO flags: db::RecordingFlags::TrailingZero as i32,
}).unwrap(); }).unwrap();
tx.commit().unwrap(); tx.commit().unwrap();
} }
let mut row = None; let mut row = None;
db.list_recordings_by_id(TEST_STREAM_ID, row_id .. row_id + 1, db.list_recordings_by_id(TEST_STREAM_ID, next .. next+1,
|r| { row = Some(r); Ok(()) }).unwrap(); |r| { row = Some(r); Ok(()) }).unwrap();
row.unwrap() row.unwrap()
} }
@ -168,7 +167,7 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) {
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
const DURATION: recording::Duration = recording::Duration(5399985); const DURATION: recording::Duration = recording::Duration(5399985);
let mut recording = db::RecordingToInsert { let mut recording = db::RecordingToInsert {
stream_id: TEST_STREAM_ID, id: db::CompositeId::new(TEST_STREAM_ID, 1),
sample_file_bytes: 30104460, sample_file_bytes: 30104460,
flags: 0, flags: 0,
time: START_TIME .. (START_TIME + DURATION), time: START_TIME .. (START_TIME + DURATION),
@ -176,15 +175,14 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) {
video_samples: 1800, video_samples: 1800,
video_sync_samples: 60, video_sync_samples: 60,
video_sample_entry_id: video_sample_entry_id, video_sample_entry_id: video_sample_entry_id,
sample_file_uuid: Uuid::nil(),
video_index: data, video_index: data,
sample_file_sha1: [0; 20], sample_file_sha1: [0; 20],
run_offset: 0, run_offset: 0,
}; };
let mut tx = db.tx().unwrap(); let mut tx = db.tx().unwrap();
tx.bypass_reservation_for_testing = true; for i in 0..num {
for _ in 0..num {
tx.insert_recording(&recording).unwrap(); tx.insert_recording(&recording).unwrap();
recording.id = db::CompositeId::new(TEST_STREAM_ID, 2 + i as i32);
recording.time.start += DURATION; recording.time.start += DURATION;
recording.time.end += DURATION; recording.time.end += DURATION;
recording.run_offset += 1; recording.run_offset += 1;

View File

@ -328,18 +328,20 @@ impl ServiceInner {
let mut prev = None; let mut prev = None;
let mut cur_off = 0; let mut cur_off = 0;
db.list_recordings_by_id(stream_id, s.ids.clone(), |r| { db.list_recordings_by_id(stream_id, s.ids.clone(), |r| {
let recording_id = r.id.recording();
// Check for missing recordings. // Check for missing recordings.
match prev { match prev {
None if r.id == s.ids.start => {}, None if recording_id == s.ids.start => {},
None => return Err(Error::new(format!("no such recording {}/{}", None => return Err(Error::new(format!("no such recording {}/{}",
stream_id, s.ids.start))), stream_id, s.ids.start))),
Some(id) if r.id != id + 1 => { Some(id) if r.id.recording() != id + 1 => {
return Err(Error::new(format!("no such recording {}/{}", return Err(Error::new(format!("no such recording {}/{}",
stream_id, id + 1))); stream_id, id + 1)));
}, },
_ => {}, _ => {},
}; };
prev = Some(r.id); prev = Some(recording_id);
// Add a segment for the relevant part of the recording, if any. // Add a segment for the relevant part of the recording, if any.
let end_time = s.end_time.unwrap_or(i64::max_value()); let end_time = s.end_time.unwrap_or(i64::max_value());
@ -348,11 +350,11 @@ impl ServiceInner {
let start = cmp::max(0, s.start_time - cur_off); let start = cmp::max(0, s.start_time - cur_off);
let end = cmp::min(d, end_time - cur_off); let end = cmp::min(d, end_time - cur_off);
let times = start as i32 .. end as i32; let times = start as i32 .. end as i32;
debug!("...appending recording {}/{} with times {:?} \ debug!("...appending recording {} with times {:?} \
(out of dur {})", r.stream_id, r.id, times, d); (out of dur {})", r.id, times, d);
builder.append(&db, r, start as i32 .. end as i32)?; builder.append(&db, r, start as i32 .. end as i32)?;
} else { } else {
debug!("...skipping recording {}/{} dur {}", r.stream_id, r.id, d); debug!("...skipping recording {} dur {}", r.id, d);
} }
cur_off += d; cur_off += d;
Ok(()) Ok(())