mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-11-28 21:18:11 -05:00
backend support for live stream (#59)
This is so far completely untested, for use by a new UI prototype. It creates a new URL endpoint which sends one video/mp4 media segment per key frame, with the dependent frames included. This means there will be about one key frame interval of latency (typically about a second). This seems hard to avoid, as mentioned in issue #59.
This commit is contained in:
@@ -23,6 +23,7 @@ libpasta = "0.1.0-rc2"
|
||||
log = "0.4"
|
||||
lru-cache = "0.1"
|
||||
mylog = { git = "https://github.com/scottlamb/mylog" }
|
||||
odds = { version = "0.3.1", features = ["std-vec"] }
|
||||
openssl = "0.10"
|
||||
parking_lot = { version = "0.7", features = [] }
|
||||
protobuf = "2.0"
|
||||
|
||||
126
db/db.rs
126
db/db.rs
@@ -395,7 +395,6 @@ impl ::std::fmt::Display for StreamType {
|
||||
|
||||
pub const ALL_STREAM_TYPES: [StreamType; 2] = [StreamType::MAIN, StreamType::SUB];
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Stream {
|
||||
pub id: i32,
|
||||
pub camera_id: i32,
|
||||
@@ -445,6 +444,20 @@ pub struct Stream {
|
||||
|
||||
/// The number of recordings in `uncommitted` which are synced and ready to commit.
|
||||
synced_recordings: usize,
|
||||
|
||||
on_live_segment: Vec<Box<FnMut(LiveSegment) -> bool + Send>>,
|
||||
}
|
||||
|
||||
/// Bounds of a single keyframe and the frames dependent on it.
|
||||
/// This is used for live stream recordings. The stream id should already be known to the
|
||||
/// subscriber.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LiveSegment {
|
||||
pub recording: i32,
|
||||
|
||||
/// The pts, relative to the start of the recording, of the start and end of this live segment,
|
||||
/// in 90kHz units.
|
||||
pub off_90k: Range<i32>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
@@ -592,7 +605,7 @@ pub struct LockedDatabase {
|
||||
flush_count: usize,
|
||||
|
||||
/// If the database is open in read-write mode, the information about the current Open row.
|
||||
open: Option<Open>,
|
||||
pub open: Option<Open>,
|
||||
|
||||
/// The monotonic time when the database was opened (whether in read-write mode or read-only
|
||||
/// mode).
|
||||
@@ -611,8 +624,8 @@ pub struct LockedDatabase {
|
||||
|
||||
/// Represents a row of the `open` database table.
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub(crate) struct Open {
|
||||
pub(crate) id: u32,
|
||||
pub struct Open {
|
||||
pub id: u32,
|
||||
pub(crate) uuid: Uuid,
|
||||
}
|
||||
|
||||
@@ -638,7 +651,7 @@ impl ::std::fmt::Display for CompositeId {
|
||||
/// structs.
|
||||
struct StreamStateChanger {
|
||||
sids: [Option<i32>; 2],
|
||||
streams: Vec<(i32, Option<Stream>)>,
|
||||
streams: Vec<(i32, Option<(i32, StreamType, StreamChange)>)>,
|
||||
}
|
||||
|
||||
impl StreamStateChanger {
|
||||
@@ -651,6 +664,7 @@ impl StreamStateChanger {
|
||||
let mut streams = Vec::with_capacity(2);
|
||||
let existing_streams = existing.map(|e| e.streams).unwrap_or_default();
|
||||
for (i, ref mut sc) in change.streams.iter_mut().enumerate() {
|
||||
let type_ = StreamType::from_index(i).unwrap();
|
||||
let mut have_data = false;
|
||||
if let Some(sid) = existing_streams[i] {
|
||||
let s = streams_by_id.get(&sid).unwrap();
|
||||
@@ -694,14 +708,8 @@ impl StreamStateChanger {
|
||||
bail!("missing stream {}", sid);
|
||||
}
|
||||
sids[i] = Some(sid);
|
||||
let s = (*s).clone();
|
||||
streams.push((sid, Some(Stream {
|
||||
sample_file_dir_id: sc.sample_file_dir_id,
|
||||
rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()),
|
||||
record: sc.record,
|
||||
flush_if_sec: sc.flush_if_sec,
|
||||
..s
|
||||
})));
|
||||
let sc = mem::replace(*sc, StreamChange::default());
|
||||
streams.push((sid, Some((camera_id, type_, sc))));
|
||||
}
|
||||
} else {
|
||||
if sc.rtsp_path.is_empty() && sc.sample_file_dir_id.is_none() && !sc.record {
|
||||
@@ -715,7 +723,6 @@ impl StreamStateChanger {
|
||||
values (:camera_id, :sample_file_dir_id, :type, :rtsp_path, :record,
|
||||
0, :flush_if_sec, 1)
|
||||
"#)?;
|
||||
let type_ = StreamType::from_index(i).unwrap();
|
||||
stmt.execute_named(&[
|
||||
(":camera_id", &camera_id),
|
||||
(":sample_file_dir_id", &sc.sample_file_dir_id),
|
||||
@@ -726,26 +733,8 @@ impl StreamStateChanger {
|
||||
])?;
|
||||
let id = tx.last_insert_rowid() as i32;
|
||||
sids[i] = Some(id);
|
||||
streams.push((id, Some(Stream {
|
||||
id,
|
||||
type_,
|
||||
camera_id,
|
||||
sample_file_dir_id: sc.sample_file_dir_id,
|
||||
rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()),
|
||||
retain_bytes: 0,
|
||||
flush_if_sec: sc.flush_if_sec,
|
||||
range: None,
|
||||
sample_file_bytes: 0,
|
||||
to_delete: Vec::new(),
|
||||
bytes_to_delete: 0,
|
||||
bytes_to_add: 0,
|
||||
duration: recording::Duration(0),
|
||||
days: BTreeMap::new(),
|
||||
record: sc.record,
|
||||
next_recording_id: 1,
|
||||
uncommitted: VecDeque::new(),
|
||||
synced_recordings: 0,
|
||||
})));
|
||||
let sc = mem::replace(*sc, StreamChange::default());
|
||||
streams.push((id, Some((camera_id, type_, sc))));
|
||||
}
|
||||
}
|
||||
Ok(StreamStateChanger {
|
||||
@@ -760,9 +749,37 @@ impl StreamStateChanger {
|
||||
for (id, stream) in self.streams.drain(..) {
|
||||
use ::std::collections::btree_map::Entry;
|
||||
match (streams_by_id.entry(id), stream) {
|
||||
(Entry::Vacant(e), Some(new)) => { e.insert(new); },
|
||||
(Entry::Vacant(e), Some((camera_id, type_, mut sc))) => {
|
||||
e.insert(Stream {
|
||||
id,
|
||||
type_,
|
||||
camera_id,
|
||||
sample_file_dir_id: sc.sample_file_dir_id,
|
||||
rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()),
|
||||
retain_bytes: 0,
|
||||
flush_if_sec: sc.flush_if_sec,
|
||||
range: None,
|
||||
sample_file_bytes: 0,
|
||||
to_delete: Vec::new(),
|
||||
bytes_to_delete: 0,
|
||||
bytes_to_add: 0,
|
||||
duration: recording::Duration(0),
|
||||
days: BTreeMap::new(),
|
||||
record: sc.record,
|
||||
next_recording_id: 1,
|
||||
uncommitted: VecDeque::new(),
|
||||
synced_recordings: 0,
|
||||
on_live_segment: Vec::new(),
|
||||
});
|
||||
},
|
||||
(Entry::Vacant(_), None) => {},
|
||||
(Entry::Occupied(mut e), Some(new)) => { e.insert(new); },
|
||||
(Entry::Occupied(e), Some((_, _, sc))) => {
|
||||
let e = e.into_mut();
|
||||
e.sample_file_dir_id = sc.sample_file_dir_id;
|
||||
e.rtsp_path = sc.rtsp_path;
|
||||
e.record = sc.record;
|
||||
e.flush_if_sec = sc.flush_if_sec;
|
||||
},
|
||||
(Entry::Occupied(e), None) => { e.remove(); },
|
||||
};
|
||||
}
|
||||
@@ -846,6 +863,40 @@ impl LockedDatabase {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Registers a callback to run on every live segment immediately after it's recorded.
|
||||
/// The callback is run with the database lock held, so it must not call back into the database
|
||||
/// or block. The callback should return false to unregister.
|
||||
pub fn watch_live(&mut self, stream_id: i32, cb: Box<FnMut(LiveSegment) -> bool + Send>)
|
||||
-> Result<(), Error> {
|
||||
let s = match self.streams_by_id.get_mut(&stream_id) {
|
||||
None => bail!("no such stream {}", stream_id),
|
||||
Some(s) => s,
|
||||
};
|
||||
s.on_live_segment.push(cb);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clears all watches on all streams.
|
||||
/// Normally watches are self-cleaning: when a segment is sent, the callback returns false if
|
||||
/// it is no longer interested (typically because hyper has just noticed the client is no
|
||||
/// longer connected). This doesn't work when the system is shutting down and nothing more is
|
||||
/// sent, though.
|
||||
pub fn clear_watches(&mut self) {
|
||||
for (_, s) in &mut self.streams_by_id {
|
||||
s.on_live_segment.clear();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveSegment) -> Result<(), Error> {
|
||||
let s = match self.streams_by_id.get_mut(&stream) {
|
||||
None => bail!("no such stream {}", stream),
|
||||
Some(s) => s,
|
||||
};
|
||||
use odds::vec::VecExt;
|
||||
s.on_live_segment.retain_mut(|cb| cb(l.clone()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for `DatabaseGuard::flush()` and `Database::drop()`.
|
||||
///
|
||||
/// The public API is in `DatabaseGuard::flush()`; it supplies the `Clocks` to this function.
|
||||
@@ -978,7 +1029,7 @@ impl LockedDatabase {
|
||||
// handlers given that it didn't add them.
|
||||
pub fn clear_on_flush(&mut self) {
|
||||
self.on_flush.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens the given sample file directories.
|
||||
///
|
||||
@@ -1427,6 +1478,7 @@ impl LockedDatabase {
|
||||
record: row.get_checked(8)?,
|
||||
uncommitted: VecDeque::new(),
|
||||
synced_recordings: 0,
|
||||
on_live_segment: Vec::new(),
|
||||
});
|
||||
c.streams[type_.index()] = Some(id);
|
||||
}
|
||||
|
||||
44
db/writer.rs
44
db/writer.rs
@@ -563,6 +563,11 @@ struct InnerWriter<F: FileWriter> {
|
||||
r: Arc<Mutex<db::RecordingToInsert>>,
|
||||
e: recording::SampleIndexEncoder,
|
||||
id: CompositeId,
|
||||
|
||||
/// The pts, relative to the start of this segment and in 90kHz units, up until which live
|
||||
/// segments have been sent out. Initially 0.
|
||||
completed_live_segment_off_90k: i32,
|
||||
|
||||
hasher: hash::Hasher,
|
||||
|
||||
/// The start time of this segment, based solely on examining the local clock after frames in
|
||||
@@ -636,7 +641,7 @@ impl ClockAdjuster {
|
||||
#[derive(Copy, Clone)]
|
||||
struct UnflushedSample {
|
||||
local_time: recording::Time,
|
||||
pts_90k: i64,
|
||||
pts_90k: i64, // relative to the start of the stream, not a single recording.
|
||||
len: i32,
|
||||
is_key: bool,
|
||||
}
|
||||
@@ -650,6 +655,7 @@ struct PreviousWriter {
|
||||
}
|
||||
|
||||
impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
/// `db` must not be locked.
|
||||
pub fn new(dir: &'a D, db: &'a db::Database<C>, channel: &'a SyncerChannel<D::File>,
|
||||
stream_id: i32, video_sample_entry_id: i32) -> Self {
|
||||
Writer {
|
||||
@@ -686,6 +692,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
r,
|
||||
e: recording::SampleIndexEncoder::new(),
|
||||
id,
|
||||
completed_live_segment_off_90k: 0,
|
||||
hasher: hash::Hasher::new(hash::MessageDigest::sha1())?,
|
||||
local_start: recording::Time(i64::max_value()),
|
||||
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
|
||||
@@ -716,7 +723,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
// We must restore it on all success or error paths.
|
||||
|
||||
if let Some(unflushed) = w.unflushed_sample.take() {
|
||||
let duration = (pts_90k - unflushed.pts_90k) as i32;
|
||||
let duration = (pts_90k - unflushed.pts_90k as i64) as i32;
|
||||
if duration <= 0 {
|
||||
// Restore invariant.
|
||||
w.unflushed_sample = Some(unflushed);
|
||||
@@ -724,7 +731,17 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
unflushed.pts_90k, pts_90k);
|
||||
}
|
||||
let duration = w.adjuster.adjust(duration);
|
||||
w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
|
||||
let d = w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
|
||||
|
||||
// If the sample `write` was called on is a key frame, then the prior frames (including
|
||||
// the one we just flushed) represent a live segment. Send it out.
|
||||
if is_key {
|
||||
self.db.lock().send_live_segment(self.stream_id, db::LiveSegment {
|
||||
recording: w.id.recording(),
|
||||
off_90k: w.completed_live_segment_off_90k .. d,
|
||||
}).unwrap();
|
||||
w.completed_live_segment_off_90k = d;
|
||||
}
|
||||
}
|
||||
let mut remaining = pkt;
|
||||
while !remaining.is_empty() {
|
||||
@@ -747,7 +764,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
pub fn close(&mut self, next_pts: Option<i64>) {
|
||||
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
|
||||
WriterState::Open(w) => {
|
||||
let prev = w.close(self.channel, next_pts);
|
||||
let prev = w.close(self.channel, next_pts, self.db, self.stream_id);
|
||||
WriterState::Closed(prev)
|
||||
},
|
||||
s => s,
|
||||
@@ -756,8 +773,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
}
|
||||
|
||||
impl<F: FileWriter> InnerWriter<F> {
|
||||
/// Returns the total duration of the `RecordingToInsert` (needed for live view path).
|
||||
fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
|
||||
pkt_local_time: recording::Time) {
|
||||
pkt_local_time: recording::Time) -> i32 {
|
||||
let mut l = self.r.lock();
|
||||
self.e.add_sample(duration_90k, bytes, is_key, &mut l);
|
||||
let new = pkt_local_time - recording::Duration(l.duration_90k as i64);
|
||||
@@ -765,9 +783,11 @@ impl<F: FileWriter> InnerWriter<F> {
|
||||
if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust.
|
||||
l.start = self.local_start;
|
||||
}
|
||||
l.duration_90k
|
||||
}
|
||||
|
||||
fn close(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>) -> PreviousWriter {
|
||||
fn close<C: Clocks + Clone>(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>,
|
||||
db: &db::Database<C>, stream_id: i32) -> PreviousWriter {
|
||||
let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample");
|
||||
let (last_sample_duration, flags) = match next_pts {
|
||||
None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32),
|
||||
@@ -776,8 +796,14 @@ impl<F: FileWriter> InnerWriter<F> {
|
||||
let mut sha1_bytes = [0u8; 20];
|
||||
sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]);
|
||||
let (local_time_delta, run_offset, end);
|
||||
self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key,
|
||||
unflushed.local_time);
|
||||
let d = self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key,
|
||||
unflushed.local_time);
|
||||
|
||||
// This always ends a live segment.
|
||||
db.lock().send_live_segment(stream_id, db::LiveSegment {
|
||||
recording: self.id.recording(),
|
||||
off_90k: self.completed_live_segment_off_90k .. d,
|
||||
}).unwrap();
|
||||
let total_duration;
|
||||
{
|
||||
let mut l = self.r.lock();
|
||||
@@ -809,7 +835,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> {
|
||||
// Swallow any error. The caller should only drop the Writer without calling close()
|
||||
// if there's already been an error. The caller should report that. No point in
|
||||
// complaining again.
|
||||
let _ = w.close(self.channel, None);
|
||||
let _ = w.close(self.channel, None, self.db, self.stream_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user