// This file is part of Moonfire NVR, a security camera digital video recorder. // Copyright (C) 2016 Scott Lamb // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // In addition, as a special exception, the copyright holders give // permission to link the code of portions of this program with the // OpenSSL library under certain conditions as described in each // individual source file, and distribute linked combinations including // the two. // // You must obey the GNU General Public License in all respects for all // of the code used other than OpenSSL. If you modify file(s) with this // exception, you may extend this exception to your version of the // file(s), but you are not obligated to do so. If you do not wish to do // so, delete this exception statement from your version. If you delete // this exception statement from all source files in the program, then // also delete it here. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . //! Sample file directory management. //! //! This includes opening files for serving, rotating away old files, and saving new files. use db::{self, CompositeId}; use failure::{Error, Fail}; use fnv::FnvHashMap; use libc::{self, c_char}; use parking_lot::Mutex; use protobuf::{self, Message}; use recording; use openssl::hash; use schema; use std::cmp; use std::ffi; use std::fs; use std::io::{self, Read, Write}; use std::mem; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::FromRawFd; use std::sync::Arc; use std::sync::mpsc; use std::thread; /// A sample file directory. Typically one per physical disk drive. /// /// If the directory is used for writing, the `start_syncer` function should be called to start /// a background thread. This thread manages deleting files and writing new files. It synces the /// directory and commits these operations to the database in the correct order to maintain the /// invariants described in `design/schema.md`. #[derive(Debug)] pub struct SampleFileDir { /// The open file descriptor for the directory. The worker uses it to create files and sync the /// directory. Other threads use it to open sample files for reading during video serving. pub(crate) fd: Fd, } /// A file descriptor associated with a directory (not necessarily the sample file dir). #[derive(Debug)] pub struct Fd(libc::c_int); impl Drop for Fd { fn drop(&mut self) { if unsafe { libc::close(self.0) } < 0 { let e = io::Error::last_os_error(); warn!("Unable to close sample file dir: {}", e); } } } impl Fd { /// Opens the given path as a directory. pub fn open(path: &str, mkdir: bool) -> Result { let cstring = ffi::CString::new(path) .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; if mkdir && unsafe { libc::mkdir(cstring.as_ptr(), 0o700) } != 0 { let e = io::Error::last_os_error(); if e.kind() != io::ErrorKind::AlreadyExists { return Err(e.into()); } } let fd = unsafe { libc::open(cstring.as_ptr(), libc::O_DIRECTORY | libc::O_RDONLY, 0) }; if fd < 0 { return Err(io::Error::last_os_error().into()); } Ok(Fd(fd)) } pub(crate) fn sync(&self) -> Result<(), io::Error> { let res = unsafe { libc::fsync(self.0) }; if res < 0 { return Err(io::Error::last_os_error()) } Ok(()) } /// Opens a sample file within this directory with the given flags and (if creating) mode. unsafe fn openat(&self, p: *const c_char, flags: libc::c_int, mode: libc::c_int) -> Result { let fd = libc::openat(self.0, p, flags, mode); if fd < 0 { return Err(io::Error::last_os_error()) } Ok(fs::File::from_raw_fd(fd)) } /// Locks the directory with the specified `flock` operation. pub fn lock(&self, operation: libc::c_int) -> Result<(), io::Error> { let ret = unsafe { libc::flock(self.0, operation) }; if ret < 0 { return Err(io::Error::last_os_error().into()); } Ok(()) } pub fn statfs(&self) -> Result { unsafe { let mut stat: libc::statvfs = mem::zeroed(); if libc::fstatvfs(self.0, &mut stat) < 0 { return Err(io::Error::last_os_error()) } Ok(stat) } } } pub(crate) unsafe fn renameat(from_fd: &Fd, from_path: *const c_char, to_fd: &Fd, to_path: *const c_char) -> Result<(), io::Error> { let result = libc::renameat(from_fd.0, from_path, to_fd.0, to_path); if result < 0 { return Err(io::Error::last_os_error()) } Ok(()) } /// Reads `dir`'s metadata. If none is found, returns an empty proto. pub(crate) fn read_meta(dir: &Fd) -> Result { let mut meta = schema::DirMeta::default(); let p = unsafe { ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char) }; let mut f = match unsafe { dir.openat(p.as_ptr(), libc::O_RDONLY, 0) } { Err(e) => { if e.kind() == ::std::io::ErrorKind::NotFound { return Ok(meta); } return Err(e.into()); }, Ok(f) => f, }; let mut data = Vec::new(); f.read_to_end(&mut data)?; let mut s = protobuf::CodedInputStream::from_bytes(&data); meta.merge_from(&mut s).map_err(|e| e.context("Unable to parse metadata proto: {}"))?; Ok(meta) } /// Write `dir`'s metadata, clobbering existing data. pub(crate) fn write_meta(dir: &Fd, meta: &schema::DirMeta) -> Result<(), Error> { let (tmp_path, final_path) = unsafe { (ffi::CStr::from_ptr("meta.tmp\0".as_ptr() as *const c_char), ffi::CStr::from_ptr("meta\0".as_ptr() as *const c_char)) }; let mut f = unsafe { dir.openat(tmp_path.as_ptr(), libc::O_CREAT | libc::O_TRUNC | libc::O_WRONLY, 0o600)? }; meta.write_to_writer(&mut f)?; f.sync_all()?; unsafe { renameat(&dir, tmp_path.as_ptr(), &dir, final_path.as_ptr())? }; dir.sync()?; Ok(()) } impl SampleFileDir { /// Opens the directory using the given metadata. /// /// `db_meta.in_progress_open` should be filled if the directory should be opened in read/write /// mode; absent in read-only mode. pub fn open(path: &str, db_meta: &schema::DirMeta) -> Result, Error> { let read_write = db_meta.in_progress_open.is_some(); let s = SampleFileDir::open_self(path, false)?; s.fd.lock(if read_write { libc::LOCK_EX } else { libc::LOCK_SH } | libc::LOCK_NB)?; let dir_meta = read_meta(&s.fd)?; if !SampleFileDir::consistent(db_meta, &dir_meta) { bail!("metadata mismatch.\ndb: {:#?}\ndir: {:#?}", db_meta, &dir_meta); } if db_meta.in_progress_open.is_some() { s.write_meta(db_meta)?; } Ok(s) } /// Returns true if the existing directory and database metadata are consistent; the directory /// is then openable. fn consistent(db_meta: &schema::DirMeta, dir_meta: &schema::DirMeta) -> bool { if dir_meta.db_uuid != db_meta.db_uuid { return false; } if dir_meta.dir_uuid != db_meta.dir_uuid { return false; } if db_meta.last_complete_open.is_some() && (db_meta.last_complete_open != dir_meta.last_complete_open && db_meta.last_complete_open != dir_meta.in_progress_open) { return false; } if db_meta.last_complete_open.is_none() && dir_meta.last_complete_open.is_some() { return false; } true } pub(crate) fn create(path: &str, db_meta: &schema::DirMeta) -> Result, Error> { let s = SampleFileDir::open_self(path, true)?; s.fd.lock(libc::LOCK_EX | libc::LOCK_NB)?; let old_meta = read_meta(&s.fd)?; // Verify metadata. We only care that it hasn't been completely opened. // Partial opening by this or another database is fine; we won't overwrite anything. if old_meta.last_complete_open.is_some() { bail!("Can't create dir at path {}: is already in use:\n{:?}", path, old_meta); } if !SampleFileDir::is_empty(path)? { bail!("Can't create dir at path {} with existing files", path); } s.write_meta(db_meta)?; Ok(s) } /// Determines if the directory is empty, aside form metadata. pub(crate) fn is_empty(path: &str) -> Result { for e in fs::read_dir(path)? { let e = e?; match e.file_name().as_bytes() { b"." | b".." => continue, b"meta" | b"meta-tmp" => continue, // existing metadata is fine. _ => return Ok(false), } } Ok(true) } fn open_self(path: &str, create: bool) -> Result, Error> { let fd = Fd::open(path, create) .map_err(|e| format_err!("unable to open sample file dir {}: {}", path, e))?; Ok(Arc::new(SampleFileDir { fd, })) } /// Opens the given sample file for reading. pub fn open_sample_file(&self, composite_id: CompositeId) -> Result { let p = SampleFileDir::get_rel_pathname(composite_id); unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } } pub(crate) fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> { write_meta(&self.fd, meta) } pub fn statfs(&self) -> Result { self.fd.statfs() } /// Gets a pathname for a sample file suitable for passing to open or unlink. fn get_rel_pathname(id: CompositeId) -> [libc::c_char; 17] { let mut buf = [0u8; 17]; write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf"); // libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64). unsafe { mem::transmute::<[u8; 17], [libc::c_char; 17]>(buf) } } /// Unlinks the given sample file within this directory. fn unlink(fd: &Fd, id: CompositeId) -> Result<(), io::Error> { let p = SampleFileDir::get_rel_pathname(id); let res = unsafe { libc::unlinkat(fd.0, p.as_ptr(), 0) }; if res < 0 { return Err(io::Error::last_os_error()) } Ok(()) } /// Syncs the directory itself. fn sync(&self) -> Result<(), io::Error> { self.fd.sync() } } /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { AsyncSaveRecording(CompositeId, fs::File), DatabaseFlushed, Flush(mpsc::SyncSender<()>), } /// A channel which can be used to send commands to the syncer. /// Can be cloned to allow multiple threads to send commands. #[derive(Clone)] pub struct SyncerChannel(mpsc::Sender); /// State of the worker thread. struct Syncer { dir_id: i32, dir: Arc, db: Arc, } /// Starts a syncer for the given sample file directory. /// /// The lock must not be held on `db` when this is called. /// /// There should be only one syncer per directory, or 0 if operating in read-only mode. /// This function will perform the initial rotation synchronously, so that it is finished before /// file writing starts. Afterward the syncing happens in a background thread. /// /// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and /// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried /// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all /// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to /// be persisted. /// /// Note that dropping all `SyncerChannel` clones currently includes calling /// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. /// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. pub fn start_syncer(db: Arc, dir_id: i32) -> Result<(SyncerChannel, thread::JoinHandle<()>), Error> { let db2 = db.clone(); let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.initial_rotation()?; let (snd, rcv) = mpsc::channel(); db.lock().on_flush(Box::new({ let snd = snd.clone(); move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) { warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); } })); Ok((SyncerChannel(snd), thread::Builder::new() .name(format!("sync-{}", path)) .spawn(move || syncer.run(rcv)).unwrap())) } pub struct NewLimit { pub stream_id: i32, pub limit: i64, } /// Deletes recordings if necessary to fit within the given new `retain_bytes` limit. /// Note this doesn't change the limit in the database; it only deletes files. /// Pass a limit of 0 to delete all recordings associated with a camera. pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) -> Result<(), Error> { let db2 = db.clone(); let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.do_rotation(|db| { for l in limits { let (bytes_before, extra); { let stream = db.streams_by_id().get(&l.stream_id) .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; bytes_before = stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete; extra = stream.retain_bytes - l.limit; } if l.limit >= bytes_before { continue } delete_recordings(db, l.stream_id, extra)?; let stream = db.streams_by_id().get(&l.stream_id).unwrap(); info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before, stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete); } Ok(()) }) } /// Deletes recordings to bring a stream's disk usage within bounds. fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, extra_bytes_needed: i64) -> Result<(), Error> { let bytes_needed = { let stream = match db.streams_by_id().get(&stream_id) { None => bail!("no stream {}", stream_id), Some(s) => s, }; stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed - stream.retain_bytes }; let mut bytes_to_delete = 0; if bytes_needed <= 0 { debug!("{}: have remaining quota of {}", stream_id, -bytes_needed); return Ok(()); } let mut n = 0; db.delete_oldest_recordings(stream_id, &mut |row| { n += 1; if bytes_needed >= bytes_to_delete { bytes_to_delete += row.sample_file_bytes as i64; n += 1; return true; } false })?; info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", stream_id, bytes_to_delete, n, bytes_needed); Ok(()) } impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. fn async_save_recording(&self, id: CompositeId, f: fs::File) { self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap(); } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. pub fn flush(&self) { let (snd, rcv) = mpsc::sync_channel(0); self.0.send(SyncerCommand::Flush(snd)).unwrap(); rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it. } } impl Syncer { fn new(l: &db::LockedDatabase, db: Arc, dir_id: i32) -> Result<(Self, String), Error> { let d = l.sample_file_dirs_by_id() .get(&dir_id) .ok_or_else(|| format_err!("no dir {}", dir_id))?; let dir = d.get()?; // Abandon files. // First, get a list of the streams in question. let streams_to_next: FnvHashMap<_, _> = l.streams_by_id() .iter() .filter_map(|(&k, v)| { if v.sample_file_dir_id == Some(dir_id) { Some((k, v.next_recording_id)) } else { None } }) .collect(); let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?; let mut undeletable = 0; for &id in &to_abandon { if let Err(e) = SampleFileDir::unlink(&dir.fd, id) { if e.kind() == io::ErrorKind::NotFound { warn!("dir: abandoned recording {} already deleted!", id); } else { warn!("dir: Unable to unlink abandoned recording {}: {}", id, e); undeletable += 1; } } } if undeletable > 0 { bail!("Unable to delete {} abandoned recordings.", undeletable); } Ok((Syncer { dir_id, dir, db, }, d.path.clone())) } /// Lists files which should be "abandoned" (deleted without ever recording in the database) /// on opening. fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap) -> Result, Error> { let mut v = Vec::new(); for e in ::std::fs::read_dir(path)? { let e = e?; let id = match parse_id(e.file_name().as_bytes()) { Ok(i) => i, Err(_) => continue, }; let next = match streams_to_next.get(&id.stream()) { Some(n) => *n, None => continue, // unknown stream. }; if id.recording() >= next { v.push(id); } } Ok(v) } fn run(&mut self, cmds: mpsc::Receiver) { loop { match cmds.recv() { Err(_) => return, // all senders have closed the channel; shutdown Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f), Ok(SyncerCommand::DatabaseFlushed) => { retry_forever(&mut || self.collect_garbage(true)) }, Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. }; } } /// Rotates files for all streams and deletes stale files from previous runs. /// Called from main thread. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { let streams: Vec = db.streams_by_id().keys().map(|&id| id).collect(); for &stream_id in &streams { delete_recordings(db, stream_id, 0)?; } Ok(()) }) } /// Helper to do initial or retention-lowering rotation. Called from main thread. fn do_rotation(&mut self, delete_recordings: F) -> Result<(), Error> where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> { { let mut db = self.db.lock(); delete_recordings(&mut *db)?; db.flush("synchronous deletion")?; } self.collect_garbage(false)?; self.db.lock().flush("synchronous garbage collection") } /// Helper for collecting garbage; called from main or worker threads. fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> { let mut garbage: Vec<_> = { let l = self.db.lock(); let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); d.garbage.iter().map(|id| *id).collect() }; let len_before = garbage.len(); garbage.retain(|&id| { if let Err(e) = SampleFileDir::unlink(&self.dir.fd, id) { if e.kind() == io::ErrorKind::NotFound { if warn_on_missing { warn!("dir: recording {} already deleted!", id); } } else { warn!("dir: Unable to unlink {}: {}", id, e); return false; } } true }); let res = if len_before > garbage.len() { Err(format_err!("Unable to unlink {} files (see earlier warning messages for details)", len_before - garbage.len())) } else { Ok(()) }; if garbage.is_empty() { // No progress. return res; } if let Err(e) = self.dir.sync() { error!("unable to sync dir: {}", e); return res.and(Err(e.into())); } if let Err(e) = self.db.lock().delete_garbage(self.dir_id, &mut garbage) { error!("unable to delete garbage ({} files) for dir {}: {}", self.dir_id, garbage.len(), e); return res.and(Err(e.into())); } res } /// Saves the given recording and causes rotation to happen. Called from worker thread. /// /// Note that part of rotation is deferred for the next cycle (saved writing or program startup) /// so that there can be only one dir sync and database transaction per save. /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. fn save(&mut self, id: CompositeId, f: fs::File) { let stream_id = id.stream(); // Free up a like number of bytes. retry_forever(&mut || delete_recordings(&mut self.db.lock(), stream_id, 0)); retry_forever(&mut || f.sync_all()); retry_forever(&mut || self.dir.sync()); let mut db = self.db.lock(); db.mark_synced(id).unwrap(); let reason = { let s = db.streams_by_id().get(&stream_id).unwrap(); let c = db.cameras_by_id().get(&s.camera_id).unwrap(); let unflushed = s.unflushed(); if unflushed < s.flush_if { debug!("{}-{}: unflushed={} < if={}, not flushing", c.short_name, s.type_.as_str(), unflushed, s.flush_if); return; } format!("{}-{}: unflushed={} >= if={}", c.short_name, s.type_.as_str(), unflushed, s.flush_if) }; if let Err(e) = db.flush(&reason) { // Don't retry the commit now in case it causes extra flash write cycles. // It's not necessary for correctness to flush before proceeding. // Just wait until the next flush would happen naturally. warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}", reason, e); } } } fn retry_forever>(f: &mut FnMut() -> Result) -> T { let sleep_time = ::std::time::Duration::new(1, 0); loop { let e = match f() { Ok(t) => return t, Err(e) => e.into(), }; warn!("sleeping for {:?} after error: {:?}", sleep_time, e); thread::sleep(sleep_time); } } /// Struct for writing a single run (of potentially several recordings) to disk and committing its /// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It /// saves the recording to the database (if I/O errors do not prevent this), retries forever, /// or panics (if further writing on this stream is impossible). pub struct Writer<'a> { dir: &'a SampleFileDir, db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, video_sample_entry_id: i32, state: WriterState, } enum WriterState { Unopened, Open(InnerWriter), Closed(PreviousWriter), } /// State for writing a single recording, used within `Writer`. /// /// Note that the recording created by every `InnerWriter` must be written to the `SyncerChannel` /// with at least one sample. The sample may have zero duration. struct InnerWriter { f: fs::File, r: Arc>, index: recording::SampleIndexEncoder, id: CompositeId, hasher: hash::Hasher, /// The end time of the previous segment in this run, if any. prev_end: Option, /// The start time of this segment, based solely on examining the local clock after frames in /// this segment were received. Frames can suffer from various kinds of delay (initial /// buffering, encoding, and network transmission), so this time is set to far in the future on /// construction, given a real value on the first packet, and decreased as less-delayed packets /// are discovered. See design/time.md for details. local_start: recording::Time, adjuster: ClockAdjuster, run_offset: i32, /// A sample which has been written to disk but not added to `index`. Index writes are one /// sample behind disk writes because the duration of a sample is the difference between its /// pts and the next sample's pts. A sample is flushed when the next sample is written, when /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). /// /// Invariant: this should always be `Some` (briefly violated during `write` call only). unflushed_sample: Option, } /// Adjusts durations given by the camera to correct its clock frequency error. #[derive(Copy, Clone, Debug)] struct ClockAdjuster { /// Every `every_minus_1 + 1` units, add `-ndir`. /// Note i32::max_value() disables adjustment. every_minus_1: i32, /// Should be 1 or -1 (unless disabled). ndir: i32, /// Keeps accumulated difference from previous values. cur: i32, } impl ClockAdjuster { fn new(local_time_delta: Option) -> Self { // Pick an adjustment rate to correct local_time_delta over the next minute (the // desired duration of a single recording). Cap the rate at 500 ppm (which corrects // 2,700/90,000ths of a second over a minute) to prevent noticeably speeding up or slowing // down playback. let (every_minus_1, ndir) = match local_time_delta { Some(d) if d <= -2700 => (1999, 1), Some(d) if d >= 2700 => (1999, -1), Some(d) if d < -60 => ((60 * 90000) / -(d as i32) - 1, 1), Some(d) if d > 60 => ((60 * 90000) / (d as i32) - 1, -1), _ => (i32::max_value(), 0), }; ClockAdjuster{ every_minus_1, ndir, cur: 0, } } fn adjust(&mut self, mut val: i32) -> i32 { self.cur += val; // The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't // cause a duration of 1 to become a duration of 0. It has no effect when increasing // durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't // be newly > self.every_minus_1.) while self.cur > self.every_minus_1 && val > self.ndir { val -= self.ndir; self.cur -= self.every_minus_1 + 1; } val } } #[derive(Copy, Clone)] struct UnflushedSample { local_time: recording::Time, pts_90k: i64, len: i32, is_key: bool, } /// State associated with a run's previous recording; used within `Writer`. #[derive(Copy, Clone)] struct PreviousWriter { end_time: recording::Time, local_time_delta: recording::Duration, run_offset: i32, } impl<'a> Writer<'a> { pub fn new(dir: &'a SampleFileDir, db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, video_sample_entry_id: i32) -> Self { Writer { dir, db, channel, stream_id, video_sample_entry_id, state: WriterState::Unopened, } } /// Opens a new writer. /// This returns a writer that violates the invariant that `unflushed_sample` is `Some`. /// The caller (`write`) is responsible for correcting this. fn open(&mut self) -> Result<&mut InnerWriter, Error> { let prev = match self.state { WriterState::Unopened => None, WriterState::Open(ref mut w) => return Ok(w), WriterState::Closed(prev) => Some(prev), }; let (id, r) = self.db.lock().add_recording(self.stream_id)?; let p = SampleFileDir::get_rel_pathname(id); let f = retry_forever(&mut || unsafe { self.dir.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) }); self.state = WriterState::Open(InnerWriter { f, r, index: recording::SampleIndexEncoder::new(), id, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, }); match self.state { WriterState::Open(ref mut w) => Ok(w), _ => unreachable!(), } } pub fn previously_opened(&self) -> Result { Ok(match self.state { WriterState::Unopened => false, WriterState::Closed(_) => true, WriterState::Open(_) => bail!("open!"), }) } /// Writes a new frame to this segment. /// `local_time` should be the local clock's time as of when this packet was received. pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64, is_key: bool) -> Result<(), Error> { let w = self.open()?; // Note w's invariant that `unflushed_sample` is `None` may currently be violated. // We must restore it on all success or error paths. if let Some(unflushed) = w.unflushed_sample.take() { let duration = (pts_90k - unflushed.pts_90k) as i32; if duration <= 0 { // Restore invariant. w.unflushed_sample = Some(unflushed); bail!("pts not monotonically increasing; got {} then {}", unflushed.pts_90k, pts_90k); } let duration = w.adjuster.adjust(duration); w.index.add_sample(duration, unflushed.len, unflushed.is_key); w.extend_local_start(unflushed.local_time); } let mut remaining = pkt; while !remaining.is_empty() { let written = retry_forever(&mut || w.f.write(remaining)); remaining = &remaining[written..]; } w.unflushed_sample = Some(UnflushedSample { local_time, pts_90k, len: pkt.len() as i32, is_key, }); w.hasher.update(pkt).unwrap(); Ok(()) } /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, /// swallowing errors and using a zero duration for the last sample. pub fn close(&mut self, next_pts: Option) { self.state = match mem::replace(&mut self.state, WriterState::Unopened) { WriterState::Open(w) => { let prev = w.close(self.channel, self.video_sample_entry_id, next_pts); WriterState::Closed(prev) }, s => s, }; } } impl InnerWriter { fn extend_local_start(&mut self, pkt_local_time: recording::Time) { let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64); self.local_start = cmp::min(self.local_start, new); } fn close(mut self, channel: &SyncerChannel, video_sample_entry_id: i32, next_pts: Option) -> PreviousWriter { let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); let duration = self.adjuster.adjust(match next_pts { None => 0, Some(p) => (p - unflushed.pts_90k) as i32, }); self.index.add_sample(duration, unflushed.len, unflushed.is_key); self.extend_local_start(unflushed.local_time); let mut sha1_bytes = [0u8; 20]; sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); let start = self.prev_end.unwrap_or(self.local_start); let end = start + recording::Duration(self.index.total_duration_90k as i64); let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 } else { 0 }; let local_start_delta = self.local_start - start; let recording = db::RecordingToInsert { sample_file_bytes: self.index.sample_file_bytes, time: start .. end, local_time_delta: local_start_delta, video_samples: self.index.video_samples, video_sync_samples: self.index.video_sync_samples, video_sample_entry_id, video_index: self.index.video_index, sample_file_sha1: sha1_bytes, run_offset: self.run_offset, flags: flags, }; self.r.lock().recording = Some(recording); channel.async_save_recording(self.id, self.f); PreviousWriter { end_time: end, local_time_delta: local_start_delta, run_offset: self.run_offset, } } } impl<'a> Drop for Writer<'a> { fn drop(&mut self) { if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) { // Swallow any error. The caller should only drop the Writer without calling close() // if there's already been an error. The caller should report that. No point in // complaining again. let _ = w.close(self.channel, self.video_sample_entry_id, None); } } } /// Parse a composite id filename. /// /// These are exactly 16 bytes, lowercase hex. pub(crate) fn parse_id(id: &[u8]) -> Result { if id.len() != 16 { return Err(()); } let mut v: u64 = 0; for i in 0..16 { v = (v << 4) | match id[i] { b @ b'0'...b'9' => b - b'0', b @ b'a'...b'f' => b - b'a' + 10, _ => return Err(()), } as u64; } Ok(CompositeId(v as i64)) } #[cfg(test)] mod tests { use super::ClockAdjuster; use testutil; #[test] fn adjust() { testutil::init(); // no-ops. for v in &[None, Some(0), Some(-10), Some(10)] { let mut a = ClockAdjuster::new(*v); for _ in 0..1800 { assert_eq!(3000, a.adjust(3000), "v={:?}", *v); } } // typical, 100 ppm adjustment. let mut a = ClockAdjuster::new(Some(-540)); let mut total = 0; for _ in 0..1800 { let new = a.adjust(3000); assert!(new == 2999 || new == 3000); total += new; } let expected = 1800*3000 - 540; assert!(total == expected || total == expected + 1, "total={} vs expected={}", total, expected); a = ClockAdjuster::new(Some(540)); let mut total = 0; for _ in 0..1800 { let new = a.adjust(3000); assert!(new == 3000 || new == 3001); total += new; } let expected = 1800*3000 + 540; assert!(total == expected || total == expected + 1, "total={} vs expected={}", total, expected); // capped at 500 ppm (change of 2,700/90,000ths over 1 minute). a = ClockAdjuster::new(Some(-1_000_000)); total = 0; for _ in 0..1800 { let new = a.adjust(3000); assert!(new == 2998 || new == 2999, "new={}", new); total += new; } let expected = 1800*3000 - 2700; assert!(total == expected || total == expected + 1, "total={} vs expected={}", total, expected); a = ClockAdjuster::new(Some(1_000_000)); total = 0; for _ in 0..1800 { let new = a.adjust(3000); assert!(new == 3001 || new == 3002, "new={}", new); total += new; } let expected = 1800*3000 + 2700; assert!(total == expected || total == expected + 1, "total={} vs expected={}", total, expected); } #[test] fn parse_id() { use super::parse_id; assert_eq!(parse_id(b"0000000000000000").unwrap().0, 0); assert_eq!(parse_id(b"0000000100000002").unwrap().0, 0x0000000100000002); parse_id(b"").unwrap_err(); parse_id(b"meta").unwrap_err(); parse_id(b"0").unwrap_err(); parse_id(b"000000010000000x").unwrap_err(); } }