live stream frame-by-frame rather than GOP-by-GOP (#59)

This should reduce live stream latency by two seconds when my cameras
are at their default setting (I frame interval = 2 * frame rate)!

I was under the impression that every HTML5 Media Source Extensions
media segment had to start with a Random Access Point. This used to
be true, but apparently changed quite a while ago:
https://bugs.chromium.org/p/chromium/issues/detail?id=229412

Support generating segments that don't start with a key frame, and
plumb this through the mp4 media segment generation logic. Add some
extra error checking in mp4 slice handling, as my first attempts had a
mismatch between expected and actual lengths that silently returned
corrupted .m4s files.

Also pull everything from the most recent key frame on along with the
first live segment to reduce startup latency. Live view is quite a bit
more pleasant now.
This commit is contained in:
Scott Lamb 2020-08-07 15:30:22 -07:00
parent b9c08b18a4
commit 8f792aeb2d
7 changed files with 252 additions and 149 deletions

View File

@ -512,13 +512,16 @@ pub struct Stream {
on_live_segment: Vec<Box<dyn FnMut(LiveSegment) -> bool + Send>>,
}
/// Bounds of a single keyframe and the frames dependent on it.
/// Bounds of a live view segment. Currently this is a single frame of video.
/// This is used for live stream recordings. The stream id should already be known to the
/// subscriber.
#[derive(Clone, Debug)]
pub struct LiveSegment {
pub recording: i32,
/// If the segment's one frame is a key frame.
pub is_key: bool,
/// The pts, relative to the start of the recording, of the start and end of this live segment,
/// in 90kHz units.
pub media_off_90k: Range<i32>,

View File

@ -143,7 +143,7 @@ impl SampleIndexIterator {
Ok(true)
}
pub fn uninitialized(&self) -> bool { self.i_and_is_key == 0 }
#[inline]
pub fn is_key(&self) -> bool { (self.i_and_is_key & 0x8000_0000) != 0 }
}
@ -208,14 +208,20 @@ impl Segment {
/// Creates a segment.
///
/// `desired_media_range_90k` represents the desired range of the segment relative to the start
/// of the recording, in media time units. The actual range will start at the first key frame
/// at or before the desired start time. (The caller is responsible for creating an edit list
/// to skip the undesired portion.) It will end at the first frame after the desired range
/// (unless the desired range extends beyond the recording). (Likewise, the caller is
/// responsible for trimming the final frame's duration if desired.)
/// of the recording, in media time units.
///
/// The actual range will start at the most recent acceptable frame's start at or before the
/// desired start time. If `start_at_key` is true, only key frames are acceptable; otherwise
/// any frame is. The caller is responsible for skipping over the undesired prefix, perhaps
/// with an edit list in the case of a `.mp4`.
///
/// The actual range will end at the first frame after the desired range (unless the desired
/// range extends beyond the recording). Likewise, the caller is responsible for trimming the
/// final frame's duration if desired.
pub fn new(db: &db::LockedDatabase,
recording: &db::ListRecordingsRow,
desired_media_range_90k: Range<i32>) -> Result<Segment, Error> {
desired_media_range_90k: Range<i32>,
start_at_key: bool) -> Result<Segment, Error> {
let mut self_ = Segment {
id: recording.id,
open_id: recording.open_id,
@ -268,7 +274,8 @@ impl Segment {
};
loop {
if it.start_90k <= desired_media_range_90k.start && it.is_key() {
if it.start_90k <= desired_media_range_90k.start &&
(!start_at_key || it.is_key()) {
// new start candidate.
*begin = it;
self_.frames = 0;
@ -317,16 +324,17 @@ impl Segment {
let data = &(&playback).video_index;
let mut it = match self.begin {
Some(ref b) => **b,
None => SampleIndexIterator::new(),
None => {
let mut it = SampleIndexIterator::new();
if !it.next(data)? {
bail!("recording {} has no frames", self.id);
}
if !it.is_key() {
bail!("recording {} doesn't start with key frame", self.id);
}
it
}
};
if it.uninitialized() {
if !it.next(data)? {
bail!("recording {}: no frames", self.id);
}
if !it.is_key() {
bail!("recording {}: doesn't start with key frame", self.id);
}
}
let mut have_frame = true;
let mut key_frame = 0;
@ -359,6 +367,17 @@ impl Segment {
}
Ok(())
}
/// Returns true if this starts with a non-key frame.
pub fn starts_with_nonkey(&self) -> bool {
match self.begin {
Some(ref b) => !b.is_key(),
// Fast-path case, in which this holds an entire recording. They always start with a
// key frame.
None => false,
}
}
}
#[cfg(test)]
@ -467,7 +486,7 @@ mod tests {
let row = db.insert_recording_from_encoder(r);
// Time range [2, 2 + 4 + 6 + 8) means the 2nd, 3rd, 4th samples should be
// included.
let segment = Segment::new(&db.db.lock(), &row, 2 .. 2+4+6+8).unwrap();
let segment = Segment::new(&db.db.lock(), &row, 2 .. 2+4+6+8, true).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[4, 6, 8]);
}
@ -486,7 +505,7 @@ mod tests {
let row = db.insert_recording_from_encoder(r);
// Time range [2 + 4 + 6, 2 + 4 + 6 + 8) means the 4th sample should be included.
// The 3rd also gets pulled in because it is a sync frame and the 4th is not.
let segment = Segment::new(&db.db.lock(), &row, 2+4+6 .. 2+4+6+8).unwrap();
let segment = Segment::new(&db.db.lock(), &row, 2+4+6 .. 2+4+6+8, true).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[6, 8]);
}
@ -500,7 +519,7 @@ mod tests {
encoder.add_sample(0, 3, true, &mut r);
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 1 .. 2).unwrap();
let segment = Segment::new(&db.db.lock(), &row, 1 .. 2, true).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[2, 3]);
}
@ -513,7 +532,7 @@ mod tests {
encoder.add_sample(1, 1, true, &mut r);
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 0).unwrap();
let segment = Segment::new(&db.db.lock(), &row, 0 .. 0, true).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1]);
}
@ -531,7 +550,7 @@ mod tests {
}
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 2+4+6+8+10).unwrap();
let segment = Segment::new(&db.db.lock(), &row, 0 .. 2+4+6+8+10, true).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[2, 4, 6, 8, 10]);
}
@ -545,7 +564,7 @@ mod tests {
encoder.add_sample(0, 3, true, &mut r);
let db = TestDb::new(RealClocks {});
let row = db.insert_recording_from_encoder(r);
let segment = Segment::new(&db.db.lock(), &row, 0 .. 2).unwrap();
let segment = Segment::new(&db.db.lock(), &row, 0 .. 2, true).unwrap();
assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1, 2, 3]);
}

View File

@ -187,7 +187,8 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) {
let mut recording = db::RecordingToInsert {
sample_file_bytes: 30104460,
start: recording::Time(1430006400i64 * TIME_UNITS_PER_SEC),
duration_90k: 5399985,
media_duration_90k: 5399985,
wall_duration_90k: 5399985,
video_samples: 1800,
video_sync_samples: 60,
video_sample_entry_id: video_sample_entry_id,
@ -197,7 +198,7 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) {
};
for _ in 0..num {
let (id, _) = db.add_recording(TEST_STREAM_ID, recording.clone()).unwrap();
recording.start += recording::Duration(recording.duration_90k as i64);
recording.start += recording::Duration(recording.wall_duration_90k as i64);
recording.run_offset += 1;
db.mark_synced(id).unwrap();
}

View File

@ -558,10 +558,6 @@ struct InnerWriter<F: FileWriter> {
e: recording::SampleIndexEncoder,
id: CompositeId,
/// The pts, relative to the start of this segment and in 90kHz units, up until which live
/// segments have been sent out. Initially 0.
completed_live_segment_media_off_90k: i32,
hasher: blake3::Hasher,
/// The start time of this segment, based solely on examining the local clock after frames in
@ -634,7 +630,6 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
r,
e: recording::SampleIndexEncoder::new(),
id,
completed_live_segment_media_off_90k: 0,
hasher: blake3::Hasher::new(),
local_start: recording::Time(i64::max_value()),
unindexed_sample: None,
@ -666,29 +661,14 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
if let Some(unindexed) = w.unindexed_sample.take() {
let duration = i32::try_from(pts_90k - i64::from(unindexed.pts_90k))?;
if duration <= 0 {
// Restore invariant.
w.unindexed_sample = Some(unindexed);
w.unindexed_sample = Some(unindexed); // restore invariant.
bail!("pts not monotonically increasing; got {} then {}",
unindexed.pts_90k, pts_90k);
}
let d = match w.add_sample(duration, unindexed.len, unindexed.is_key,
unindexed.local_time) {
Ok(d) => d,
Err(e) => {
// Restore invariant.
w.unindexed_sample = Some(unindexed);
return Err(e);
},
};
// If the sample `write` was called on is a key frame, then the prior frames (including
// the one we just flushed) represent a live segment. Send it out.
if is_key {
self.db.lock().send_live_segment(self.stream_id, db::LiveSegment {
recording: w.id.recording(),
media_off_90k: w.completed_live_segment_media_off_90k .. d,
}).unwrap();
w.completed_live_segment_media_off_90k = d;
if let Err(e) = w.add_sample(duration, unindexed.len, unindexed.is_key,
unindexed.local_time, self.db, self.stream_id) {
w.unindexed_sample = Some(unindexed); // restore invariant.
return Err(e);
}
}
let mut remaining = pkt;
@ -726,12 +706,14 @@ fn clamp(v: i64, min: i64, max: i64) -> i64 {
}
impl<F: FileWriter> InnerWriter<F> {
/// Returns the total media duration of the `RecordingToInsert` (needed for live view path).
fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
pkt_local_time: recording::Time) -> Result<i32, Error> {
fn add_sample<C: Clocks + Clone>(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
pkt_local_time: recording::Time, db: &db::Database<C>,
stream_id: i32)
-> Result<(), Error> {
let mut l = self.r.lock();
// design/time.md explains these time manipulations in detail.
let prev_media_duration_90k = l.media_duration_90k;
let media_duration_90k = l.media_duration_90k + duration_90k;
let local_start =
cmp::min(self.local_start,
@ -754,7 +736,13 @@ impl<F: FileWriter> InnerWriter<F> {
l.start = start;
self.local_start = local_start;
self.e.add_sample(duration_90k, bytes, is_key, &mut l);
Ok(media_duration_90k)
drop(l);
db.lock().send_live_segment(stream_id, db::LiveSegment {
recording: self.id.recording(),
is_key,
media_off_90k: prev_media_duration_90k .. media_duration_90k,
}).unwrap();
Ok(())
}
fn close<C: Clocks + Clone>(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>,
@ -766,14 +754,10 @@ impl<F: FileWriter> InnerWriter<F> {
};
let blake3 = self.hasher.finalize();
let (run_offset, end);
let d = self.add_sample(last_sample_duration, unindexed.len, unindexed.is_key,
unindexed.local_time)?;
self.add_sample(last_sample_duration, unindexed.len, unindexed.is_key,
unindexed.local_time, db, stream_id)?;
// This always ends a live segment.
db.lock().send_live_segment(stream_id, db::LiveSegment {
recording: self.id.recording(),
media_off_90k: self.completed_live_segment_media_off_90k .. d,
}).unwrap();
let wall_duration;
{
let mut l = self.r.lock();

View File

@ -468,9 +468,9 @@ WebSocket headers as described in [RFC 6455][rfc-6455] and (if authentication
is required) the `s` cookie.
The server will send a sequence of binary messages. Each message corresponds
to one GOP of video: a key (IDR) frame and all other frames which depend on it.
These are encoded as a `.mp4` media segment. The following headers will be
included:
to one or more frames of video. The first message is guaranteed to start with a
"key" (IDR) frame; others may not. The message will contain HTTP headers
followed by by a `.mp4` media segment. The following headers will be included:
* `X-Recording-Id`: the open id, a period, and the recording id of the
recording these frames belong to.
@ -483,10 +483,8 @@ included:
* `X-Media-Time-Range`: the relative media start and end times of these
frames within the recording, as a half-open interval.
Cameras are typically configured to have about one key frame per second, so
there will be one part per second when the stream is working. If the stream is
not connected, the HTTP GET request will wait until the stream is established,
possibly forever.
The WebSocket will always open immediately but will receive messages only while the
backing RTSP stream is connected.
Example request URI:
@ -529,14 +527,21 @@ X-Video-Sample-Entry-Id: 4
binary mp4 data
```
If the wall duration and media duration for these recordings are equal, these
segments are exactly the same as the ones that can be retrieved at the
following URLs, respectively:
These roughly correspond to the `.m4s` files available at the following URLs:
* `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5680@42.5220058-5400061`
* `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.0-180002`
* `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.180002-360004`
However, there are two important differences:
* The `/view.m4s` endpoint accepts offsets within a recording as wall durations;
the `/live.m4s` endpoint's `X-Media-Time-Range` header returns them as
media durations. Thus the URLs above are only exactly correct if the wall
and media durations of the recording are identical.
* The `/view.m4s` endpoint always returns a time range that starts with a key frame;
`/live.m4s` messages may not include a key frame.
Note: an earlier version of this API used a `multipart/mixed` segment instead,
compatible with the [multipart-stream-js][multipart-stream-js] library. The
problem with this approach is that browsers have low limits on the number of

View File

@ -382,9 +382,9 @@ unsafe impl Sync for Segment {}
impl Segment {
fn new(db: &db::LockedDatabase, row: &db::ListRecordingsRow, rel_media_range_90k: Range<i32>,
first_frame_num: u32) -> Result<Self, Error> {
first_frame_num: u32, start_at_key: bool) -> Result<Self, Error> {
Ok(Segment {
s: recording::Segment::new(db, row, rel_media_range_90k.clone())
s: recording::Segment::new(db, row, rel_media_range_90k.clone(), start_at_key)
.err_kind(ErrorKind::Unknown)?,
recording_start: row.start,
recording_wall_duration_90k: row.wall_duration_90k,
@ -475,8 +475,9 @@ impl Segment {
}
fn truns_len(&self) -> usize {
(self.s.key_frames as usize) * (mem::size_of::<u32>() * 6) +
( self.s.frames as usize) * (mem::size_of::<u32>() * 2)
self.s.key_frames as usize * (mem::size_of::<u32>() * 6) +
self.s.frames as usize * (mem::size_of::<u32>() * 2) +
if self.s.starts_with_nonkey() { mem::size_of::<u32>() * 5 } else { 0 }
}
// TrackRunBox / trun (8.8.8).
@ -494,7 +495,8 @@ impl Segment {
let mut run_info: Option<RunInfo> = None;
let mut data_pos = initial_pos;
self.s.foreach(playback, |it| {
if it.is_key() {
let is_key = it.is_key();
if is_key {
if let Some(r) = run_info.take() {
// Finish a non-terminal run.
let p = v.len();
@ -503,48 +505,56 @@ impl Segment {
BigEndian::write_u32(&mut v[r.sample_count_pos .. r.sample_count_pos + 4],
r.count);
}
let box_len_pos = v.len();
v.extend_from_slice(&[
0x00, 0x00, 0x00, 0x00, // placeholder for size
b't', b'r', b'u', b'n',
// version 0, tr_flags:
// 0x000001 data-offset-present
// 0x000004 first-sample-flags-present
// 0x000100 sample-duration-present
// 0x000200 sample-size-present
0x00, 0x00, 0x03, 0x05,
]);
run_info = Some(RunInfo {
box_len_pos,
sample_count_pos: v.len(),
count: 1,
last_start: it.start_90k,
last_dur: it.duration_90k,
});
v.write_u32::<BigEndian>(0)?; // placeholder for sample count
v.write_u32::<BigEndian>(data_pos as u32)?;
// first_sample_flags. See trex (8.8.3.1).
v.write_u32::<BigEndian>(
// As defined by the Independent and Disposable Samples Box (sdp, 8.6.4).
(2 << 26) | // is_leading: this sample is not a leading sample
(2 << 24) | // sample_depends_on: this sample does not depend on others
(1 << 22) | // sample_is_depend_on: others may depend on this one
(2 << 20) | // sample_has_redundancy: no redundant coding
// As defined by the sample padding bits (padb, 8.7.6).
(0 << 17) | // no padding
(0 << 16) | // sample_is_non_sync_sample=0
0)?; // TODO: sample_degradation_priority
} else {
let r = run_info.as_mut().expect("non-key sample must be preceded by key sample");
r.count += 1;
r.last_start = it.start_90k;
r.last_dur = it.duration_90k;
}
let mut r = match run_info.take() {
None => {
let box_len_pos = v.len();
v.extend_from_slice(&[
0x00, 0x00, 0x00, 0x00, // placeholder for size
b't', b'r', b'u', b'n',
// version 0, tr_flags:
// 0x000001 data-offset-present
// 0x000004 first-sample-flags-present
// 0x000100 sample-duration-present
// 0x000200 sample-size-present
0x00, 0x00, 0x03, 0x01 | if is_key { 0x04 } else { 0 },
]);
let sample_count_pos = v.len();
v.write_u32::<BigEndian>(0)?; // placeholder for sample count
v.write_u32::<BigEndian>(data_pos as u32)?;
if is_key {
// first_sample_flags. See trex (8.8.3.1).
v.write_u32::<BigEndian>(
// As defined by the Independent and Disposable Samples Box
// (sdp, 8.6.4).
(2 << 26) | // is_leading: this sample is not a leading sample
(2 << 24) | // sample_depends_on: this sample does not depend on others
(1 << 22) | // sample_is_depend_on: others may depend on this one
(2 << 20) | // sample_has_redundancy: no redundant coding
// As defined by the sample padding bits (padb, 8.7.6).
(0 << 17) | // no padding
(0 << 16) | // sample_is_non_sync_sample=0
0)?; // TODO: sample_degradation_priority
}
RunInfo {
box_len_pos,
sample_count_pos,
count: 0,
last_start: 0,
last_dur: 0,
}
},
Some(r) => r,
};
r.count += 1;
r.last_start = it.start_90k;
r.last_dur = it.duration_90k;
v.write_u32::<BigEndian>(it.duration_90k as u32)?;
v.write_u32::<BigEndian>(it.bytes as u32)?;
data_pos += it.bytes as u64;
run_info = Some(r);
Ok(())
}).err_kind(ErrorKind::Internal)?;
if let Some(r) = run_info.take() {
@ -563,6 +573,9 @@ impl Segment {
.unwrap());
}
if len != v.len() {
bail_t!(Internal, "truns on {:?} expected len {} got len {}", self, len, v.len());
}
Ok(v)
}
}
@ -643,12 +656,18 @@ impl Slice {
}
fn p(&self) -> usize { (self.0 >> 44) as usize }
fn wrap_index<F>(&self, mp4: &File, r: Range<u64>, f: &F) -> Result<Chunk, Error>
fn wrap_index<F>(&self, mp4: &File, r: Range<u64>, len: u64, f: &F) -> Result<Chunk, Error>
where F: Fn(&[u8], SegmentLengths) -> &[u8] {
let mp4 = ARefss::new(mp4.0.clone());
let r = r.start as usize .. r.end as usize;
let p = self.p();
Ok(mp4.try_map(|mp4| Ok::<_, Error>(&mp4.segments[p].get_index(&mp4.db, f)?[r]))?.into())
Ok(mp4.try_map(|mp4| {
let i = mp4.segments[p].get_index(&mp4.db, f)?;
if u64::try_from(i.len()).unwrap() != len {
bail_t!(Internal, "expected len {} got {}", len, i.len());
}
Ok::<_, Error>(&i[r])
})?.into())
}
fn wrap_truns(&self, mp4: &File, r: Range<u64>, len: usize) -> Result<Chunk, Error> {
@ -665,6 +684,17 @@ impl Slice {
let truns = ARefss::new(truns);
Ok(truns.map(|t| &t[r.start as usize .. r.end as usize]).into())
}
fn wrap_video_sample_entry(&self, f: &File, r: Range<u64>, len: u64) -> Result<Chunk, Error> {
let mp4 = ARefss::new(f.0.clone());
Ok(mp4.try_map(|mp4| {
let data = &mp4.video_sample_entries[self.p()].data;
if u64::try_from(data.len()).unwrap() != len {
bail_t!(Internal, "expected len {} got len {}", len, data.len());
}
Ok::<_, Error>(&data[r.start as usize .. r.end as usize])
})?.into())
}
}
impl slices::Slice for Slice {
@ -679,21 +709,21 @@ impl slices::Slice for Slice {
let res = match self.t() {
SliceType::Static => {
let s = STATIC_BYTESTRINGS[p];
let part = &s[range.start as usize .. range.end as usize];
Ok(part.into())
if u64::try_from(s.len()).unwrap() != len {
Err(format_err_t!(Internal, "expected len {} got len {}", len, s.len()))
} else {
let part = &s[range.start as usize .. range.end as usize];
Ok(part.into())
}
},
SliceType::Buf => {
let r = ARefss::new(f.0.clone());
Ok(r.map(|f| &f.buf[p+range.start as usize .. p+range.end as usize]).into())
},
SliceType::VideoSampleEntry => {
let r = ARefss::new(f.0.clone());
Ok(r.map(|f| &f.video_sample_entries[p]
.data[range.start as usize .. range.end as usize]).into())
},
SliceType::Stts => self.wrap_index(f, range.clone(), &Segment::stts),
SliceType::Stsz => self.wrap_index(f, range.clone(), &Segment::stsz),
SliceType::Stss => self.wrap_index(f, range.clone(), &Segment::stss),
SliceType::VideoSampleEntry => self.wrap_video_sample_entry(f, range.clone(), len),
SliceType::Stts => self.wrap_index(f, range.clone(), len, &Segment::stts),
SliceType::Stsz => self.wrap_index(f, range.clone(), len, &Segment::stsz),
SliceType::Stss => self.wrap_index(f, range.clone(), len, &Segment::stss),
SliceType::Co64 => f.0.get_co64(range.clone(), len),
SliceType::VideoSampleData => f.0.get_video_sample_data(p, range.clone()),
SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len),
@ -797,7 +827,7 @@ impl FileBuilder {
/// `rel_media_range_90k` is the media time range within the recording.
/// Eg `0 .. row.media_duration_90k` means the full recording.
pub fn append(&mut self, db: &db::LockedDatabase, row: db::ListRecordingsRow,
rel_media_range_90k: Range<i32>) -> Result<(), Error> {
rel_media_range_90k: Range<i32>, start_at_key: bool) -> Result<(), Error> {
if let Some(prev) = self.segments.last() {
if prev.s.have_trailing_zero() {
bail_t!(InvalidArgument,
@ -810,7 +840,7 @@ impl FileBuilder {
self.prev_media_duration_and_cur_runs = row.prev_media_duration_and_runs
.map(|(d, r)| (d, r + if row.open_id == 0 { 1 } else { 0 }));
}
let s = Segment::new(db, &row, rel_media_range_90k, self.next_frame_num)?;
let s = Segment::new(db, &row, rel_media_range_90k, self.next_frame_num, start_at_key)?;
self.next_frame_num += s.s.frames as u32;
self.segments.push(s);
@ -1006,7 +1036,8 @@ impl FileBuilder {
write_length!(self, {
self.body.buf.extend_from_slice(b"mvex");
// Appends a `TrackExtendsBox` (ISO/IEC 14496-12 section 8.8.3) for the video track.
// Appends a `TrackExtendsBox`, `trex` (ISO/IEC 14496-12 section 8.8.3) for the video
// track.
write_length!(self, {
self.body.buf.extend_from_slice(&[
b't', b'r', b'e', b'x',
@ -1043,7 +1074,7 @@ impl FileBuilder {
write_length!(self, {
self.body.buf.extend_from_slice(b"traf");
// TrackFragmentHeaderBox (ISO/IEC 14496-12 section 8.8.7).
// TrackFragmentHeaderBox, tfhd (ISO/IEC 14496-12 section 8.8.7).
write_length!(self, {
self.body.buf.extend_from_slice(&[
b't', b'f', b'h', b'd',
@ -1793,6 +1824,8 @@ mod tests {
str::from_utf8(&self.stack.last().expect("at root").boxtype[..]).unwrap()
}
pub fn depth(&self) -> usize { self.stack.len() }
/// Gets the specified byte range within the current box (excluding length and type).
/// Must not be at EOF.
pub async fn get(&self, start: u64, buf: &mut [u8]) {
@ -1912,6 +1945,15 @@ mod tests {
}
}
/// Traverses the box structure in `mp4` depth-first, validating the box positions.
async fn traverse(mp4: File) {
let mut cursor = BoxCursor::new(mp4);
cursor.down().await;
while cursor.depth() > 0 {
cursor.next().await;
}
}
fn copy_mp4_to_db(db: &TestDb<RealClocks>) {
let mut input =
stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap();
@ -1950,8 +1992,8 @@ mod tests {
db.syncer_channel.flush();
}
pub fn create_mp4_from_db(tdb: &TestDb<RealClocks>,
skip_90k: i32, shorten_90k: i32, include_subtitles: bool) -> File {
pub fn create_mp4_from_db(tdb: &TestDb<RealClocks>, skip_90k: i32, shorten_90k: i32,
include_subtitles: bool) -> File {
let mut builder = FileBuilder::new(Type::Normal);
builder.include_timestamp_subtitle_track(include_subtitles).unwrap();
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
@ -1961,7 +2003,7 @@ mod tests {
let d = r.media_duration_90k;
assert!(skip_90k + shorten_90k < d, "skip_90k={} shorten_90k={} r={:?}",
skip_90k, shorten_90k, r);
builder.append(&*db, r, skip_90k .. d - shorten_90k).unwrap();
builder.append(&*db, r, skip_90k .. d - shorten_90k, true).unwrap();
Ok(())
}).unwrap();
}
@ -2027,7 +2069,8 @@ mod tests {
/// sample tables that match the supplied encoder.
fn make_mp4_from_encoders(type_: Type, db: &TestDb<RealClocks>,
mut recordings: Vec<db::RecordingToInsert>,
desired_range_90k: Range<i32>) -> Result<File, Error> {
desired_range_90k: Range<i32>,
start_at_key: bool) -> Result<File, Error> {
let mut builder = FileBuilder::new(type_);
let mut duration_so_far = 0;
for r in recordings.drain(..) {
@ -2040,7 +2083,7 @@ mod tests {
desired_range_90k.end - duration_so_far
};
duration_so_far += row.media_duration_90k;
builder.append(&db.db.lock(), row, d_start .. d_end).unwrap();
builder.append(&db.db.lock(), row, d_start .. d_end, start_at_key).unwrap();
}
builder.build(db.db.clone(), db.dirs_by_stream_id.clone())
}
@ -2059,7 +2102,8 @@ mod tests {
}
// Time range [2, 2+4+6+8) means the 2nd, 3rd, and 4th samples should be included.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2 .. 2+4+6+8).unwrap();
let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2 .. 2+4+6+8, true).unwrap();
traverse(mp4.clone()).await;
let track = find_track(mp4, 1).await;
assert!(track.edts_cursor.is_none());
let mut cursor = track.stbl_cursor;
@ -2114,7 +2158,9 @@ mod tests {
// Time range [2+4+6, 2+4+6+8) means the 4th sample should be included.
// The 3rd gets pulled in also because it's a sync frame and the 4th isn't.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2+4+6 .. 2+4+6+8).unwrap();
let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2+4+6 .. 2+4+6+8, true)
.unwrap();
traverse(mp4.clone()).await;
let track = find_track(mp4, 1).await;
// Examine edts. It should skip the 3rd frame.
@ -2167,7 +2213,7 @@ mod tests {
async fn test_no_segments() {
testutil::init();
let db = TestDb::new(RealClocks {});
let e = make_mp4_from_encoders(Type::Normal, &db, vec![], 0 .. 0).err().unwrap();
let e = make_mp4_from_encoders(Type::Normal, &db, vec![], 0 .. 0, true).err().unwrap();
assert_eq!(e.to_string(), "Invalid argument: no video_sample_entries");
}
@ -2189,7 +2235,9 @@ mod tests {
encoders.push(r);
// This should include samples 3 and 4 only, both sync frames.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1+2 .. 1+2+3+4).unwrap();
let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1+2 .. 1+2+3+4, true)
.unwrap();
traverse(mp4.clone()).await;
let mut cursor = BoxCursor::new(mp4);
cursor.down().await;
assert!(cursor.find(b"moov").await);
@ -2224,7 +2272,8 @@ mod tests {
encoders.push(r);
// Multi-segment recording with an edit list, encoding with a zero-duration recording.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1 .. 2+3).unwrap();
let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1 .. 2+3, true).unwrap();
traverse(mp4.clone()).await;
let track = find_track(mp4, 1).await;
let mut cursor = track.edts_cursor.unwrap();
cursor.down().await;
@ -2249,7 +2298,8 @@ mod tests {
// Time range [2+4+6, 2+4+6+8+1) means the 4th sample and part of the 5th are included.
// The 3rd gets pulled in also because it's a sync frame and the 4th isn't.
let mp4 = make_mp4_from_encoders(Type::MediaSegment, &db, vec![r],
2+4+6 .. 2+4+6+8+1).unwrap();
2+4+6 .. 2+4+6+8+1, true).unwrap();
traverse(mp4.clone()).await;
let mut cursor = BoxCursor::new(mp4);
cursor.down().await;
@ -2277,12 +2327,43 @@ mod tests {
assert_eq!(cursor.get_u32(20).await, 15); // sample size
}
/// Tests `.mp4` files which represent a single frame, as in the live view WebSocket stream.
#[tokio::test]
async fn test_single_frame_media_segment() {
testutil::init();
let db = TestDb::new(RealClocks {});
let mut r = db::RecordingToInsert::default();
let mut encoder = recording::SampleIndexEncoder::new();
for i in 1..6 {
let duration_90k = 2 * i;
let bytes = 3 * i;
encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r);
}
let mut pos = 0;
for i in 1..6 {
let duration_90k = 2 * i;
let mp4 = make_mp4_from_encoders(Type::MediaSegment, &db, vec![r.clone()],
pos .. pos+duration_90k, false).unwrap();
traverse(mp4.clone()).await;
let mut cursor = BoxCursor::new(mp4);
cursor.down().await;
assert!(cursor.find(b"moof").await);
cursor.down().await;
assert!(cursor.find(b"traf").await);
cursor.down().await;
assert!(cursor.find(b"trun").await);
pos += duration_90k;
}
}
#[tokio::test]
async fn test_round_trip() {
testutil::init();
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 0, 0, false);
traverse(mp4.clone()).await;
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 0, 0);
@ -2306,6 +2387,7 @@ mod tests {
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 0, 0, true);
traverse(mp4.clone()).await;
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 0, 0);
@ -2329,6 +2411,7 @@ mod tests {
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 1, 0, false);
traverse(mp4.clone()).await;
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 1, 0);
@ -2352,6 +2435,7 @@ mod tests {
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 0, 1, false);
traverse(mp4.clone()).await;
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 0, 1);
@ -2428,7 +2512,7 @@ mod bench {
}
lazy_static! {
static ref SERVER: BenchServer = { BenchServer::new() };
static ref SERVER: BenchServer = BenchServer::new();
}
#[bench]
@ -2446,8 +2530,8 @@ mod bench {
Ok(())
}).unwrap();
let row = row.unwrap();
let rel_range_90k = 0 .. row.duration_90k;
super::Segment::new(&db, &row, rel_range_90k, 1).unwrap()
let rel_range_90k = 0 .. row.media_duration_90k;
super::Segment::new(&db, &row, rel_range_90k, 1, true).unwrap()
};
db.with_recording_playback(segment.s.id, &mut |playback| {
let v = segment.build_index(playback).unwrap(); // warm.

View File

@ -424,22 +424,29 @@ impl Service {
tungstenite::protocol::Role::Server,
None,
).await;
// Start the first segment at a key frame to reduce startup latency.
let mut start_at_key = true;
loop {
let live = match sub_rx.next().await {
Some(l) => l,
None => return,
};
if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await {
info!("chunk: is_key={:?}", live.is_key);
if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live,
start_at_key).await {
info!("Dropping WebSocket after error: {}", e);
return;
}
start_at_key = false;
}
}
async fn stream_live_m4s_chunk(
&self, open_id: u32, stream_id: i32,
ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
live: db::LiveSegment) -> Result<(), Error> {
live: db::LiveSegment, start_at_key: bool) -> Result<(), Error> {
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);
let mut row = None;
{
@ -448,7 +455,7 @@ impl Service {
db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| {
rows += 1;
row = Some(r);
builder.append(&db, r, live.media_off_90k.clone())?;
builder.append(&db, r, live.media_off_90k.clone(), start_at_key)?;
Ok(())
})?;
if rows != 1 {
@ -753,7 +760,7 @@ impl Service {
let mr =
rescale(wr.start, r.wall_duration_90k, r.media_duration_90k) ..
rescale(wr.end, r.wall_duration_90k, r.media_duration_90k);
builder.append(&db, r, mr)?;
builder.append(&db, r, mr, true)?;
} else {
debug!("...skipping recording {} wall dur {}", r.id, wd);
}
@ -1441,7 +1448,7 @@ mod bench {
}
lazy_static! {
static ref SERVER: Server = { Server::new() };
static ref SERVER: Server = Server::new();
}
#[bench]