diff --git a/db/db.rs b/db/db.rs index 42a5bb8..9a61d3a 100644 --- a/db/db.rs +++ b/db/db.rs @@ -512,13 +512,16 @@ pub struct Stream { on_live_segment: Vec bool + Send>>, } -/// Bounds of a single keyframe and the frames dependent on it. +/// Bounds of a live view segment. Currently this is a single frame of video. /// This is used for live stream recordings. The stream id should already be known to the /// subscriber. #[derive(Clone, Debug)] pub struct LiveSegment { pub recording: i32, + /// If the segment's one frame is a key frame. + pub is_key: bool, + /// The pts, relative to the start of the recording, of the start and end of this live segment, /// in 90kHz units. pub media_off_90k: Range, diff --git a/db/recording.rs b/db/recording.rs index bb773e6..fcb1a2b 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -143,7 +143,7 @@ impl SampleIndexIterator { Ok(true) } - pub fn uninitialized(&self) -> bool { self.i_and_is_key == 0 } + #[inline] pub fn is_key(&self) -> bool { (self.i_and_is_key & 0x8000_0000) != 0 } } @@ -208,14 +208,20 @@ impl Segment { /// Creates a segment. /// /// `desired_media_range_90k` represents the desired range of the segment relative to the start - /// of the recording, in media time units. The actual range will start at the first key frame - /// at or before the desired start time. (The caller is responsible for creating an edit list - /// to skip the undesired portion.) It will end at the first frame after the desired range - /// (unless the desired range extends beyond the recording). (Likewise, the caller is - /// responsible for trimming the final frame's duration if desired.) + /// of the recording, in media time units. + /// + /// The actual range will start at the most recent acceptable frame's start at or before the + /// desired start time. If `start_at_key` is true, only key frames are acceptable; otherwise + /// any frame is. The caller is responsible for skipping over the undesired prefix, perhaps + /// with an edit list in the case of a `.mp4`. + /// + /// The actual range will end at the first frame after the desired range (unless the desired + /// range extends beyond the recording). Likewise, the caller is responsible for trimming the + /// final frame's duration if desired. pub fn new(db: &db::LockedDatabase, recording: &db::ListRecordingsRow, - desired_media_range_90k: Range) -> Result { + desired_media_range_90k: Range, + start_at_key: bool) -> Result { let mut self_ = Segment { id: recording.id, open_id: recording.open_id, @@ -268,7 +274,8 @@ impl Segment { }; loop { - if it.start_90k <= desired_media_range_90k.start && it.is_key() { + if it.start_90k <= desired_media_range_90k.start && + (!start_at_key || it.is_key()) { // new start candidate. *begin = it; self_.frames = 0; @@ -317,16 +324,17 @@ impl Segment { let data = &(&playback).video_index; let mut it = match self.begin { Some(ref b) => **b, - None => SampleIndexIterator::new(), + None => { + let mut it = SampleIndexIterator::new(); + if !it.next(data)? { + bail!("recording {} has no frames", self.id); + } + if !it.is_key() { + bail!("recording {} doesn't start with key frame", self.id); + } + it + } }; - if it.uninitialized() { - if !it.next(data)? { - bail!("recording {}: no frames", self.id); - } - if !it.is_key() { - bail!("recording {}: doesn't start with key frame", self.id); - } - } let mut have_frame = true; let mut key_frame = 0; @@ -359,6 +367,17 @@ impl Segment { } Ok(()) } + + /// Returns true if this starts with a non-key frame. + pub fn starts_with_nonkey(&self) -> bool { + match self.begin { + Some(ref b) => !b.is_key(), + + // Fast-path case, in which this holds an entire recording. They always start with a + // key frame. + None => false, + } + } } #[cfg(test)] @@ -467,7 +486,7 @@ mod tests { let row = db.insert_recording_from_encoder(r); // Time range [2, 2 + 4 + 6 + 8) means the 2nd, 3rd, 4th samples should be // included. - let segment = Segment::new(&db.db.lock(), &row, 2 .. 2+4+6+8).unwrap(); + let segment = Segment::new(&db.db.lock(), &row, 2 .. 2+4+6+8, true).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[4, 6, 8]); } @@ -486,7 +505,7 @@ mod tests { let row = db.insert_recording_from_encoder(r); // Time range [2 + 4 + 6, 2 + 4 + 6 + 8) means the 4th sample should be included. // The 3rd also gets pulled in because it is a sync frame and the 4th is not. - let segment = Segment::new(&db.db.lock(), &row, 2+4+6 .. 2+4+6+8).unwrap(); + let segment = Segment::new(&db.db.lock(), &row, 2+4+6 .. 2+4+6+8, true).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[6, 8]); } @@ -500,7 +519,7 @@ mod tests { encoder.add_sample(0, 3, true, &mut r); let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); - let segment = Segment::new(&db.db.lock(), &row, 1 .. 2).unwrap(); + let segment = Segment::new(&db.db.lock(), &row, 1 .. 2, true).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[2, 3]); } @@ -513,7 +532,7 @@ mod tests { encoder.add_sample(1, 1, true, &mut r); let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); - let segment = Segment::new(&db.db.lock(), &row, 0 .. 0).unwrap(); + let segment = Segment::new(&db.db.lock(), &row, 0 .. 0, true).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1]); } @@ -531,7 +550,7 @@ mod tests { } let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); - let segment = Segment::new(&db.db.lock(), &row, 0 .. 2+4+6+8+10).unwrap(); + let segment = Segment::new(&db.db.lock(), &row, 0 .. 2+4+6+8+10, true).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[2, 4, 6, 8, 10]); } @@ -545,7 +564,7 @@ mod tests { encoder.add_sample(0, 3, true, &mut r); let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); - let segment = Segment::new(&db.db.lock(), &row, 0 .. 2).unwrap(); + let segment = Segment::new(&db.db.lock(), &row, 0 .. 2, true).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1, 2, 3]); } diff --git a/db/testutil.rs b/db/testutil.rs index a13354d..c8df386 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -187,7 +187,8 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { let mut recording = db::RecordingToInsert { sample_file_bytes: 30104460, start: recording::Time(1430006400i64 * TIME_UNITS_PER_SEC), - duration_90k: 5399985, + media_duration_90k: 5399985, + wall_duration_90k: 5399985, video_samples: 1800, video_sync_samples: 60, video_sample_entry_id: video_sample_entry_id, @@ -197,7 +198,7 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { }; for _ in 0..num { let (id, _) = db.add_recording(TEST_STREAM_ID, recording.clone()).unwrap(); - recording.start += recording::Duration(recording.duration_90k as i64); + recording.start += recording::Duration(recording.wall_duration_90k as i64); recording.run_offset += 1; db.mark_synced(id).unwrap(); } diff --git a/db/writer.rs b/db/writer.rs index 93f572f..f970f6d 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -558,10 +558,6 @@ struct InnerWriter { e: recording::SampleIndexEncoder, id: CompositeId, - /// The pts, relative to the start of this segment and in 90kHz units, up until which live - /// segments have been sent out. Initially 0. - completed_live_segment_media_off_90k: i32, - hasher: blake3::Hasher, /// The start time of this segment, based solely on examining the local clock after frames in @@ -634,7 +630,6 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { r, e: recording::SampleIndexEncoder::new(), id, - completed_live_segment_media_off_90k: 0, hasher: blake3::Hasher::new(), local_start: recording::Time(i64::max_value()), unindexed_sample: None, @@ -666,29 +661,14 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { if let Some(unindexed) = w.unindexed_sample.take() { let duration = i32::try_from(pts_90k - i64::from(unindexed.pts_90k))?; if duration <= 0 { - // Restore invariant. - w.unindexed_sample = Some(unindexed); + w.unindexed_sample = Some(unindexed); // restore invariant. bail!("pts not monotonically increasing; got {} then {}", unindexed.pts_90k, pts_90k); } - let d = match w.add_sample(duration, unindexed.len, unindexed.is_key, - unindexed.local_time) { - Ok(d) => d, - Err(e) => { - // Restore invariant. - w.unindexed_sample = Some(unindexed); - return Err(e); - }, - }; - - // If the sample `write` was called on is a key frame, then the prior frames (including - // the one we just flushed) represent a live segment. Send it out. - if is_key { - self.db.lock().send_live_segment(self.stream_id, db::LiveSegment { - recording: w.id.recording(), - media_off_90k: w.completed_live_segment_media_off_90k .. d, - }).unwrap(); - w.completed_live_segment_media_off_90k = d; + if let Err(e) = w.add_sample(duration, unindexed.len, unindexed.is_key, + unindexed.local_time, self.db, self.stream_id) { + w.unindexed_sample = Some(unindexed); // restore invariant. + return Err(e); } } let mut remaining = pkt; @@ -726,12 +706,14 @@ fn clamp(v: i64, min: i64, max: i64) -> i64 { } impl InnerWriter { - /// Returns the total media duration of the `RecordingToInsert` (needed for live view path). - fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, - pkt_local_time: recording::Time) -> Result { + fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, + pkt_local_time: recording::Time, db: &db::Database, + stream_id: i32) + -> Result<(), Error> { let mut l = self.r.lock(); // design/time.md explains these time manipulations in detail. + let prev_media_duration_90k = l.media_duration_90k; let media_duration_90k = l.media_duration_90k + duration_90k; let local_start = cmp::min(self.local_start, @@ -754,7 +736,13 @@ impl InnerWriter { l.start = start; self.local_start = local_start; self.e.add_sample(duration_90k, bytes, is_key, &mut l); - Ok(media_duration_90k) + drop(l); + db.lock().send_live_segment(stream_id, db::LiveSegment { + recording: self.id.recording(), + is_key, + media_off_90k: prev_media_duration_90k .. media_duration_90k, + }).unwrap(); + Ok(()) } fn close(mut self, channel: &SyncerChannel, next_pts: Option, @@ -766,14 +754,10 @@ impl InnerWriter { }; let blake3 = self.hasher.finalize(); let (run_offset, end); - let d = self.add_sample(last_sample_duration, unindexed.len, unindexed.is_key, - unindexed.local_time)?; + self.add_sample(last_sample_duration, unindexed.len, unindexed.is_key, + unindexed.local_time, db, stream_id)?; // This always ends a live segment. - db.lock().send_live_segment(stream_id, db::LiveSegment { - recording: self.id.recording(), - media_off_90k: self.completed_live_segment_media_off_90k .. d, - }).unwrap(); let wall_duration; { let mut l = self.r.lock(); diff --git a/design/api.md b/design/api.md index c7047b0..29d6136 100644 --- a/design/api.md +++ b/design/api.md @@ -468,9 +468,9 @@ WebSocket headers as described in [RFC 6455][rfc-6455] and (if authentication is required) the `s` cookie. The server will send a sequence of binary messages. Each message corresponds -to one GOP of video: a key (IDR) frame and all other frames which depend on it. -These are encoded as a `.mp4` media segment. The following headers will be -included: +to one or more frames of video. The first message is guaranteed to start with a +"key" (IDR) frame; others may not. The message will contain HTTP headers +followed by by a `.mp4` media segment. The following headers will be included: * `X-Recording-Id`: the open id, a period, and the recording id of the recording these frames belong to. @@ -483,10 +483,8 @@ included: * `X-Media-Time-Range`: the relative media start and end times of these frames within the recording, as a half-open interval. -Cameras are typically configured to have about one key frame per second, so -there will be one part per second when the stream is working. If the stream is -not connected, the HTTP GET request will wait until the stream is established, -possibly forever. +The WebSocket will always open immediately but will receive messages only while the +backing RTSP stream is connected. Example request URI: @@ -529,14 +527,21 @@ X-Video-Sample-Entry-Id: 4 binary mp4 data ``` -If the wall duration and media duration for these recordings are equal, these -segments are exactly the same as the ones that can be retrieved at the -following URLs, respectively: +These roughly correspond to the `.m4s` files available at the following URLs: * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5680@42.5220058-5400061` * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.0-180002` * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.180002-360004` +However, there are two important differences: + +* The `/view.m4s` endpoint accepts offsets within a recording as wall durations; + the `/live.m4s` endpoint's `X-Media-Time-Range` header returns them as + media durations. Thus the URLs above are only exactly correct if the wall + and media durations of the recording are identical. +* The `/view.m4s` endpoint always returns a time range that starts with a key frame; + `/live.m4s` messages may not include a key frame. + Note: an earlier version of this API used a `multipart/mixed` segment instead, compatible with the [multipart-stream-js][multipart-stream-js] library. The problem with this approach is that browsers have low limits on the number of diff --git a/src/mp4.rs b/src/mp4.rs index f0a76af..2e2c387 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -382,9 +382,9 @@ unsafe impl Sync for Segment {} impl Segment { fn new(db: &db::LockedDatabase, row: &db::ListRecordingsRow, rel_media_range_90k: Range, - first_frame_num: u32) -> Result { + first_frame_num: u32, start_at_key: bool) -> Result { Ok(Segment { - s: recording::Segment::new(db, row, rel_media_range_90k.clone()) + s: recording::Segment::new(db, row, rel_media_range_90k.clone(), start_at_key) .err_kind(ErrorKind::Unknown)?, recording_start: row.start, recording_wall_duration_90k: row.wall_duration_90k, @@ -475,8 +475,9 @@ impl Segment { } fn truns_len(&self) -> usize { - (self.s.key_frames as usize) * (mem::size_of::() * 6) + - ( self.s.frames as usize) * (mem::size_of::() * 2) + self.s.key_frames as usize * (mem::size_of::() * 6) + + self.s.frames as usize * (mem::size_of::() * 2) + + if self.s.starts_with_nonkey() { mem::size_of::() * 5 } else { 0 } } // TrackRunBox / trun (8.8.8). @@ -494,7 +495,8 @@ impl Segment { let mut run_info: Option = None; let mut data_pos = initial_pos; self.s.foreach(playback, |it| { - if it.is_key() { + let is_key = it.is_key(); + if is_key { if let Some(r) = run_info.take() { // Finish a non-terminal run. let p = v.len(); @@ -503,48 +505,56 @@ impl Segment { BigEndian::write_u32(&mut v[r.sample_count_pos .. r.sample_count_pos + 4], r.count); } - let box_len_pos = v.len(); - v.extend_from_slice(&[ - 0x00, 0x00, 0x00, 0x00, // placeholder for size - b't', b'r', b'u', b'n', - - // version 0, tr_flags: - // 0x000001 data-offset-present - // 0x000004 first-sample-flags-present - // 0x000100 sample-duration-present - // 0x000200 sample-size-present - 0x00, 0x00, 0x03, 0x05, - ]); - run_info = Some(RunInfo { - box_len_pos, - sample_count_pos: v.len(), - count: 1, - last_start: it.start_90k, - last_dur: it.duration_90k, - }); - v.write_u32::(0)?; // placeholder for sample count - v.write_u32::(data_pos as u32)?; - - // first_sample_flags. See trex (8.8.3.1). - v.write_u32::( - // As defined by the Independent and Disposable Samples Box (sdp, 8.6.4). - (2 << 26) | // is_leading: this sample is not a leading sample - (2 << 24) | // sample_depends_on: this sample does not depend on others - (1 << 22) | // sample_is_depend_on: others may depend on this one - (2 << 20) | // sample_has_redundancy: no redundant coding - // As defined by the sample padding bits (padb, 8.7.6). - (0 << 17) | // no padding - (0 << 16) | // sample_is_non_sync_sample=0 - 0)?; // TODO: sample_degradation_priority - } else { - let r = run_info.as_mut().expect("non-key sample must be preceded by key sample"); - r.count += 1; - r.last_start = it.start_90k; - r.last_dur = it.duration_90k; } + let mut r = match run_info.take() { + None => { + let box_len_pos = v.len(); + v.extend_from_slice(&[ + 0x00, 0x00, 0x00, 0x00, // placeholder for size + b't', b'r', b'u', b'n', + + // version 0, tr_flags: + // 0x000001 data-offset-present + // 0x000004 first-sample-flags-present + // 0x000100 sample-duration-present + // 0x000200 sample-size-present + 0x00, 0x00, 0x03, 0x01 | if is_key { 0x04 } else { 0 }, + ]); + let sample_count_pos = v.len(); + v.write_u32::(0)?; // placeholder for sample count + v.write_u32::(data_pos as u32)?; + + if is_key { + // first_sample_flags. See trex (8.8.3.1). + v.write_u32::( + // As defined by the Independent and Disposable Samples Box + // (sdp, 8.6.4). + (2 << 26) | // is_leading: this sample is not a leading sample + (2 << 24) | // sample_depends_on: this sample does not depend on others + (1 << 22) | // sample_is_depend_on: others may depend on this one + (2 << 20) | // sample_has_redundancy: no redundant coding + // As defined by the sample padding bits (padb, 8.7.6). + (0 << 17) | // no padding + (0 << 16) | // sample_is_non_sync_sample=0 + 0)?; // TODO: sample_degradation_priority + } + RunInfo { + box_len_pos, + sample_count_pos, + count: 0, + last_start: 0, + last_dur: 0, + } + }, + Some(r) => r, + }; + r.count += 1; + r.last_start = it.start_90k; + r.last_dur = it.duration_90k; v.write_u32::(it.duration_90k as u32)?; v.write_u32::(it.bytes as u32)?; data_pos += it.bytes as u64; + run_info = Some(r); Ok(()) }).err_kind(ErrorKind::Internal)?; if let Some(r) = run_info.take() { @@ -563,6 +573,9 @@ impl Segment { .unwrap()); } + if len != v.len() { + bail_t!(Internal, "truns on {:?} expected len {} got len {}", self, len, v.len()); + } Ok(v) } } @@ -643,12 +656,18 @@ impl Slice { } fn p(&self) -> usize { (self.0 >> 44) as usize } - fn wrap_index(&self, mp4: &File, r: Range, f: &F) -> Result + fn wrap_index(&self, mp4: &File, r: Range, len: u64, f: &F) -> Result where F: Fn(&[u8], SegmentLengths) -> &[u8] { let mp4 = ARefss::new(mp4.0.clone()); let r = r.start as usize .. r.end as usize; let p = self.p(); - Ok(mp4.try_map(|mp4| Ok::<_, Error>(&mp4.segments[p].get_index(&mp4.db, f)?[r]))?.into()) + Ok(mp4.try_map(|mp4| { + let i = mp4.segments[p].get_index(&mp4.db, f)?; + if u64::try_from(i.len()).unwrap() != len { + bail_t!(Internal, "expected len {} got {}", len, i.len()); + } + Ok::<_, Error>(&i[r]) + })?.into()) } fn wrap_truns(&self, mp4: &File, r: Range, len: usize) -> Result { @@ -665,6 +684,17 @@ impl Slice { let truns = ARefss::new(truns); Ok(truns.map(|t| &t[r.start as usize .. r.end as usize]).into()) } + + fn wrap_video_sample_entry(&self, f: &File, r: Range, len: u64) -> Result { + let mp4 = ARefss::new(f.0.clone()); + Ok(mp4.try_map(|mp4| { + let data = &mp4.video_sample_entries[self.p()].data; + if u64::try_from(data.len()).unwrap() != len { + bail_t!(Internal, "expected len {} got len {}", len, data.len()); + } + Ok::<_, Error>(&data[r.start as usize .. r.end as usize]) + })?.into()) + } } impl slices::Slice for Slice { @@ -679,21 +709,21 @@ impl slices::Slice for Slice { let res = match self.t() { SliceType::Static => { let s = STATIC_BYTESTRINGS[p]; - let part = &s[range.start as usize .. range.end as usize]; - Ok(part.into()) + if u64::try_from(s.len()).unwrap() != len { + Err(format_err_t!(Internal, "expected len {} got len {}", len, s.len())) + } else { + let part = &s[range.start as usize .. range.end as usize]; + Ok(part.into()) + } }, SliceType::Buf => { let r = ARefss::new(f.0.clone()); Ok(r.map(|f| &f.buf[p+range.start as usize .. p+range.end as usize]).into()) }, - SliceType::VideoSampleEntry => { - let r = ARefss::new(f.0.clone()); - Ok(r.map(|f| &f.video_sample_entries[p] - .data[range.start as usize .. range.end as usize]).into()) - }, - SliceType::Stts => self.wrap_index(f, range.clone(), &Segment::stts), - SliceType::Stsz => self.wrap_index(f, range.clone(), &Segment::stsz), - SliceType::Stss => self.wrap_index(f, range.clone(), &Segment::stss), + SliceType::VideoSampleEntry => self.wrap_video_sample_entry(f, range.clone(), len), + SliceType::Stts => self.wrap_index(f, range.clone(), len, &Segment::stts), + SliceType::Stsz => self.wrap_index(f, range.clone(), len, &Segment::stsz), + SliceType::Stss => self.wrap_index(f, range.clone(), len, &Segment::stss), SliceType::Co64 => f.0.get_co64(range.clone(), len), SliceType::VideoSampleData => f.0.get_video_sample_data(p, range.clone()), SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len), @@ -797,7 +827,7 @@ impl FileBuilder { /// `rel_media_range_90k` is the media time range within the recording. /// Eg `0 .. row.media_duration_90k` means the full recording. pub fn append(&mut self, db: &db::LockedDatabase, row: db::ListRecordingsRow, - rel_media_range_90k: Range) -> Result<(), Error> { + rel_media_range_90k: Range, start_at_key: bool) -> Result<(), Error> { if let Some(prev) = self.segments.last() { if prev.s.have_trailing_zero() { bail_t!(InvalidArgument, @@ -810,7 +840,7 @@ impl FileBuilder { self.prev_media_duration_and_cur_runs = row.prev_media_duration_and_runs .map(|(d, r)| (d, r + if row.open_id == 0 { 1 } else { 0 })); } - let s = Segment::new(db, &row, rel_media_range_90k, self.next_frame_num)?; + let s = Segment::new(db, &row, rel_media_range_90k, self.next_frame_num, start_at_key)?; self.next_frame_num += s.s.frames as u32; self.segments.push(s); @@ -1006,7 +1036,8 @@ impl FileBuilder { write_length!(self, { self.body.buf.extend_from_slice(b"mvex"); - // Appends a `TrackExtendsBox` (ISO/IEC 14496-12 section 8.8.3) for the video track. + // Appends a `TrackExtendsBox`, `trex` (ISO/IEC 14496-12 section 8.8.3) for the video + // track. write_length!(self, { self.body.buf.extend_from_slice(&[ b't', b'r', b'e', b'x', @@ -1043,7 +1074,7 @@ impl FileBuilder { write_length!(self, { self.body.buf.extend_from_slice(b"traf"); - // TrackFragmentHeaderBox (ISO/IEC 14496-12 section 8.8.7). + // TrackFragmentHeaderBox, tfhd (ISO/IEC 14496-12 section 8.8.7). write_length!(self, { self.body.buf.extend_from_slice(&[ b't', b'f', b'h', b'd', @@ -1793,6 +1824,8 @@ mod tests { str::from_utf8(&self.stack.last().expect("at root").boxtype[..]).unwrap() } + pub fn depth(&self) -> usize { self.stack.len() } + /// Gets the specified byte range within the current box (excluding length and type). /// Must not be at EOF. pub async fn get(&self, start: u64, buf: &mut [u8]) { @@ -1912,6 +1945,15 @@ mod tests { } } + /// Traverses the box structure in `mp4` depth-first, validating the box positions. + async fn traverse(mp4: File) { + let mut cursor = BoxCursor::new(mp4); + cursor.down().await; + while cursor.depth() > 0 { + cursor.next().await; + } + } + fn copy_mp4_to_db(db: &TestDb) { let mut input = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); @@ -1950,8 +1992,8 @@ mod tests { db.syncer_channel.flush(); } - pub fn create_mp4_from_db(tdb: &TestDb, - skip_90k: i32, shorten_90k: i32, include_subtitles: bool) -> File { + pub fn create_mp4_from_db(tdb: &TestDb, skip_90k: i32, shorten_90k: i32, + include_subtitles: bool) -> File { let mut builder = FileBuilder::new(Type::Normal); builder.include_timestamp_subtitle_track(include_subtitles).unwrap(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); @@ -1961,7 +2003,7 @@ mod tests { let d = r.media_duration_90k; assert!(skip_90k + shorten_90k < d, "skip_90k={} shorten_90k={} r={:?}", skip_90k, shorten_90k, r); - builder.append(&*db, r, skip_90k .. d - shorten_90k).unwrap(); + builder.append(&*db, r, skip_90k .. d - shorten_90k, true).unwrap(); Ok(()) }).unwrap(); } @@ -2027,7 +2069,8 @@ mod tests { /// sample tables that match the supplied encoder. fn make_mp4_from_encoders(type_: Type, db: &TestDb, mut recordings: Vec, - desired_range_90k: Range) -> Result { + desired_range_90k: Range, + start_at_key: bool) -> Result { let mut builder = FileBuilder::new(type_); let mut duration_so_far = 0; for r in recordings.drain(..) { @@ -2040,7 +2083,7 @@ mod tests { desired_range_90k.end - duration_so_far }; duration_so_far += row.media_duration_90k; - builder.append(&db.db.lock(), row, d_start .. d_end).unwrap(); + builder.append(&db.db.lock(), row, d_start .. d_end, start_at_key).unwrap(); } builder.build(db.db.clone(), db.dirs_by_stream_id.clone()) } @@ -2059,7 +2102,8 @@ mod tests { } // Time range [2, 2+4+6+8) means the 2nd, 3rd, and 4th samples should be included. - let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2 .. 2+4+6+8).unwrap(); + let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2 .. 2+4+6+8, true).unwrap(); + traverse(mp4.clone()).await; let track = find_track(mp4, 1).await; assert!(track.edts_cursor.is_none()); let mut cursor = track.stbl_cursor; @@ -2114,7 +2158,9 @@ mod tests { // Time range [2+4+6, 2+4+6+8) means the 4th sample should be included. // The 3rd gets pulled in also because it's a sync frame and the 4th isn't. - let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2+4+6 .. 2+4+6+8).unwrap(); + let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2+4+6 .. 2+4+6+8, true) + .unwrap(); + traverse(mp4.clone()).await; let track = find_track(mp4, 1).await; // Examine edts. It should skip the 3rd frame. @@ -2167,7 +2213,7 @@ mod tests { async fn test_no_segments() { testutil::init(); let db = TestDb::new(RealClocks {}); - let e = make_mp4_from_encoders(Type::Normal, &db, vec![], 0 .. 0).err().unwrap(); + let e = make_mp4_from_encoders(Type::Normal, &db, vec![], 0 .. 0, true).err().unwrap(); assert_eq!(e.to_string(), "Invalid argument: no video_sample_entries"); } @@ -2189,7 +2235,9 @@ mod tests { encoders.push(r); // This should include samples 3 and 4 only, both sync frames. - let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1+2 .. 1+2+3+4).unwrap(); + let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1+2 .. 1+2+3+4, true) + .unwrap(); + traverse(mp4.clone()).await; let mut cursor = BoxCursor::new(mp4); cursor.down().await; assert!(cursor.find(b"moov").await); @@ -2224,7 +2272,8 @@ mod tests { encoders.push(r); // Multi-segment recording with an edit list, encoding with a zero-duration recording. - let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1 .. 2+3).unwrap(); + let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1 .. 2+3, true).unwrap(); + traverse(mp4.clone()).await; let track = find_track(mp4, 1).await; let mut cursor = track.edts_cursor.unwrap(); cursor.down().await; @@ -2249,7 +2298,8 @@ mod tests { // Time range [2+4+6, 2+4+6+8+1) means the 4th sample and part of the 5th are included. // The 3rd gets pulled in also because it's a sync frame and the 4th isn't. let mp4 = make_mp4_from_encoders(Type::MediaSegment, &db, vec![r], - 2+4+6 .. 2+4+6+8+1).unwrap(); + 2+4+6 .. 2+4+6+8+1, true).unwrap(); + traverse(mp4.clone()).await; let mut cursor = BoxCursor::new(mp4); cursor.down().await; @@ -2277,12 +2327,43 @@ mod tests { assert_eq!(cursor.get_u32(20).await, 15); // sample size } + /// Tests `.mp4` files which represent a single frame, as in the live view WebSocket stream. + #[tokio::test] + async fn test_single_frame_media_segment() { + testutil::init(); + let db = TestDb::new(RealClocks {}); + let mut r = db::RecordingToInsert::default(); + let mut encoder = recording::SampleIndexEncoder::new(); + for i in 1..6 { + let duration_90k = 2 * i; + let bytes = 3 * i; + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); + } + + let mut pos = 0; + for i in 1..6 { + let duration_90k = 2 * i; + let mp4 = make_mp4_from_encoders(Type::MediaSegment, &db, vec![r.clone()], + pos .. pos+duration_90k, false).unwrap(); + traverse(mp4.clone()).await; + let mut cursor = BoxCursor::new(mp4); + cursor.down().await; + assert!(cursor.find(b"moof").await); + cursor.down().await; + assert!(cursor.find(b"traf").await); + cursor.down().await; + assert!(cursor.find(b"trun").await); + pos += duration_90k; + } + } + #[tokio::test] async fn test_round_trip() { testutil::init(); let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 0, 0, false); + traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; compare_mp4s(&new_filename, 0, 0); @@ -2306,6 +2387,7 @@ mod tests { let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 0, 0, true); + traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; compare_mp4s(&new_filename, 0, 0); @@ -2329,6 +2411,7 @@ mod tests { let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 1, 0, false); + traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; compare_mp4s(&new_filename, 1, 0); @@ -2352,6 +2435,7 @@ mod tests { let db = TestDb::new(RealClocks {}); copy_mp4_to_db(&db); let mp4 = create_mp4_from_db(&db, 0, 1, false); + traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; compare_mp4s(&new_filename, 0, 1); @@ -2428,7 +2512,7 @@ mod bench { } lazy_static! { - static ref SERVER: BenchServer = { BenchServer::new() }; + static ref SERVER: BenchServer = BenchServer::new(); } #[bench] @@ -2446,8 +2530,8 @@ mod bench { Ok(()) }).unwrap(); let row = row.unwrap(); - let rel_range_90k = 0 .. row.duration_90k; - super::Segment::new(&db, &row, rel_range_90k, 1).unwrap() + let rel_range_90k = 0 .. row.media_duration_90k; + super::Segment::new(&db, &row, rel_range_90k, 1, true).unwrap() }; db.with_recording_playback(segment.s.id, &mut |playback| { let v = segment.build_index(playback).unwrap(); // warm. diff --git a/src/web.rs b/src/web.rs index b01809c..068391f 100644 --- a/src/web.rs +++ b/src/web.rs @@ -424,22 +424,29 @@ impl Service { tungstenite::protocol::Role::Server, None, ).await; + + // Start the first segment at a key frame to reduce startup latency. + let mut start_at_key = true; loop { let live = match sub_rx.next().await { Some(l) => l, None => return, }; - if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await { + + info!("chunk: is_key={:?}", live.is_key); + if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, + start_at_key).await { info!("Dropping WebSocket after error: {}", e); return; } + start_at_key = false; } } async fn stream_live_m4s_chunk( &self, open_id: u32, stream_id: i32, ws: &mut tokio_tungstenite::WebSocketStream, - live: db::LiveSegment) -> Result<(), Error> { + live: db::LiveSegment, start_at_key: bool) -> Result<(), Error> { let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); let mut row = None; { @@ -448,7 +455,7 @@ impl Service { db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| { rows += 1; row = Some(r); - builder.append(&db, r, live.media_off_90k.clone())?; + builder.append(&db, r, live.media_off_90k.clone(), start_at_key)?; Ok(()) })?; if rows != 1 { @@ -753,7 +760,7 @@ impl Service { let mr = rescale(wr.start, r.wall_duration_90k, r.media_duration_90k) .. rescale(wr.end, r.wall_duration_90k, r.media_duration_90k); - builder.append(&db, r, mr)?; + builder.append(&db, r, mr, true)?; } else { debug!("...skipping recording {} wall dur {}", r.id, wd); } @@ -1441,7 +1448,7 @@ mod bench { } lazy_static! { - static ref SERVER: Server = { Server::new() }; + static ref SERVER: Server = Server::new(); } #[bench]