mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-12-08 00:32:26 -05:00
view in-progress recordings!
The time from recorded to viewable was previously 60-120 sec for the first recording of a RTSP session, 0-60 sec otherwise. Now it's one frame.
This commit is contained in:
89
db/dir.rs
89
db/dir.rs
@@ -646,14 +646,11 @@ enum WriterState {
|
||||
/// with at least one sample. The sample may have zero duration.
|
||||
struct InnerWriter {
|
||||
f: fs::File,
|
||||
r: Arc<Mutex<db::UncommittedRecording>>,
|
||||
index: recording::SampleIndexEncoder,
|
||||
r: Arc<Mutex<db::RecordingToInsert>>,
|
||||
e: recording::SampleIndexEncoder,
|
||||
id: CompositeId,
|
||||
hasher: hash::Hasher,
|
||||
|
||||
/// The end time of the previous segment in this run, if any.
|
||||
prev_end: Option<recording::Time>,
|
||||
|
||||
/// The start time of this segment, based solely on examining the local clock after frames in
|
||||
/// this segment were received. Frames can suffer from various kinds of delay (initial
|
||||
/// buffering, encoding, and network transmission), so this time is set to far in the future on
|
||||
@@ -663,8 +660,6 @@ struct InnerWriter {
|
||||
|
||||
adjuster: ClockAdjuster,
|
||||
|
||||
run_offset: i32,
|
||||
|
||||
/// A sample which has been written to disk but not added to `index`. Index writes are one
|
||||
/// sample behind disk writes because the duration of a sample is the difference between its
|
||||
/// pts and the next sample's pts. A sample is flushed when the next sample is written, when
|
||||
@@ -735,7 +730,7 @@ struct UnflushedSample {
|
||||
/// State associated with a run's previous recording; used within `Writer`.
|
||||
#[derive(Copy, Clone)]
|
||||
struct PreviousWriter {
|
||||
end_time: recording::Time,
|
||||
end: recording::Time,
|
||||
local_time_delta: recording::Duration,
|
||||
run_offset: i32,
|
||||
}
|
||||
@@ -762,7 +757,13 @@ impl<'a> Writer<'a> {
|
||||
WriterState::Open(ref mut w) => return Ok(w),
|
||||
WriterState::Closed(prev) => Some(prev),
|
||||
};
|
||||
let (id, r) = self.db.lock().add_recording(self.stream_id)?;
|
||||
let (id, r) = self.db.lock().add_recording(self.stream_id, db::RecordingToInsert {
|
||||
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
|
||||
start: prev.map(|p| p.end).unwrap_or(recording::Time(i64::max_value())),
|
||||
video_sample_entry_id: self.video_sample_entry_id,
|
||||
flags: db::RecordingFlags::Growing as i32,
|
||||
..Default::default()
|
||||
})?;
|
||||
let p = SampleFileDir::get_rel_pathname(id);
|
||||
let f = retry_forever(&mut || unsafe {
|
||||
self.dir.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600)
|
||||
@@ -771,13 +772,11 @@ impl<'a> Writer<'a> {
|
||||
self.state = WriterState::Open(InnerWriter {
|
||||
f,
|
||||
r,
|
||||
index: recording::SampleIndexEncoder::new(),
|
||||
e: recording::SampleIndexEncoder::new(),
|
||||
id,
|
||||
hasher: hash::Hasher::new(hash::MessageDigest::sha1())?,
|
||||
prev_end: prev.map(|p| p.end_time),
|
||||
local_start: recording::Time(i64::max_value()),
|
||||
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
|
||||
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
|
||||
unflushed_sample: None,
|
||||
});
|
||||
match self.state {
|
||||
@@ -812,8 +811,7 @@ impl<'a> Writer<'a> {
|
||||
unflushed.pts_90k, pts_90k);
|
||||
}
|
||||
let duration = w.adjuster.adjust(duration);
|
||||
w.index.add_sample(duration, unflushed.len, unflushed.is_key);
|
||||
w.extend_local_start(unflushed.local_time);
|
||||
w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
|
||||
}
|
||||
let mut remaining = pkt;
|
||||
while !remaining.is_empty() {
|
||||
@@ -836,7 +834,7 @@ impl<'a> Writer<'a> {
|
||||
pub fn close(&mut self, next_pts: Option<i64>) {
|
||||
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
|
||||
WriterState::Open(w) => {
|
||||
let prev = w.close(self.channel, self.video_sample_entry_id, next_pts);
|
||||
let prev = w.close(self.channel, next_pts);
|
||||
WriterState::Closed(prev)
|
||||
},
|
||||
s => s,
|
||||
@@ -845,45 +843,42 @@ impl<'a> Writer<'a> {
|
||||
}
|
||||
|
||||
impl InnerWriter {
|
||||
fn extend_local_start(&mut self, pkt_local_time: recording::Time) {
|
||||
let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64);
|
||||
fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
|
||||
pkt_local_time: recording::Time) {
|
||||
let mut l = self.r.lock();
|
||||
self.e.add_sample(duration_90k, bytes, is_key, &mut l);
|
||||
let new = pkt_local_time - recording::Duration(l.duration_90k as i64);
|
||||
self.local_start = cmp::min(self.local_start, new);
|
||||
if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust.
|
||||
l.start = self.local_start;
|
||||
}
|
||||
}
|
||||
|
||||
fn close(mut self, channel: &SyncerChannel, video_sample_entry_id: i32,
|
||||
next_pts: Option<i64>) -> PreviousWriter {
|
||||
fn close(mut self, channel: &SyncerChannel, next_pts: Option<i64>) -> PreviousWriter {
|
||||
let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample");
|
||||
let duration = self.adjuster.adjust(match next_pts {
|
||||
None => 0,
|
||||
Some(p) => (p - unflushed.pts_90k) as i32,
|
||||
});
|
||||
self.index.add_sample(duration, unflushed.len, unflushed.is_key);
|
||||
self.extend_local_start(unflushed.local_time);
|
||||
let (duration, flags) = match next_pts {
|
||||
None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32),
|
||||
Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0),
|
||||
};
|
||||
let mut sha1_bytes = [0u8; 20];
|
||||
sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]);
|
||||
let start = self.prev_end.unwrap_or(self.local_start);
|
||||
let end = start + recording::Duration(self.index.total_duration_90k as i64);
|
||||
let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 }
|
||||
else { 0 };
|
||||
let local_start_delta = self.local_start - start;
|
||||
let recording = db::RecordingToInsert {
|
||||
sample_file_bytes: self.index.sample_file_bytes,
|
||||
time: start .. end,
|
||||
local_time_delta: local_start_delta,
|
||||
video_samples: self.index.video_samples,
|
||||
video_sync_samples: self.index.video_sync_samples,
|
||||
video_sample_entry_id,
|
||||
video_index: self.index.video_index,
|
||||
sample_file_sha1: sha1_bytes,
|
||||
run_offset: self.run_offset,
|
||||
flags: flags,
|
||||
};
|
||||
self.r.lock().recording = Some(recording);
|
||||
let (local_time_delta, run_offset, end);
|
||||
self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
|
||||
{
|
||||
let mut l = self.r.lock();
|
||||
l.flags = flags;
|
||||
local_time_delta = self.local_start - l.start;
|
||||
l.local_time_delta = local_time_delta;
|
||||
l.sample_file_sha1 = sha1_bytes;
|
||||
run_offset = l.run_offset;
|
||||
end = l.start + recording::Duration(l.duration_90k as i64);
|
||||
}
|
||||
drop(self.r);
|
||||
channel.async_save_recording(self.id, self.f);
|
||||
PreviousWriter {
|
||||
end_time: end,
|
||||
local_time_delta: local_start_delta,
|
||||
run_offset: self.run_offset,
|
||||
end,
|
||||
local_time_delta,
|
||||
run_offset,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -894,7 +889,7 @@ impl<'a> Drop for Writer<'a> {
|
||||
// Swallow any error. The caller should only drop the Writer without calling close()
|
||||
// if there's already been an error. The caller should report that. No point in
|
||||
// complaining again.
|
||||
let _ = w.close(self.channel, self.video_sample_entry_id, None);
|
||||
let _ = w.close(self.channel, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user