increment sequence number on live stream endpoint

This is an experiment to see if helps with #203.
This commit is contained in:
Scott Lamb 2023-01-04 19:07:37 -06:00
parent a6bdf0bd80
commit 28cd864f94
No known key found for this signature in database
3 changed files with 33 additions and 15 deletions

View File

@ -864,7 +864,7 @@ macro_rules! write_length {
pub enum Type { pub enum Type {
Normal, Normal,
InitSegment, InitSegment,
MediaSegment, MediaSegment { sequence_number: u32 },
} }
impl FileBuilder { impl FileBuilder {
@ -891,7 +891,7 @@ impl FileBuilder {
/// Sets if the generated `.mp4` should include a subtitle track with second-level timestamps. /// Sets if the generated `.mp4` should include a subtitle track with second-level timestamps.
/// Default is false. /// Default is false.
pub fn include_timestamp_subtitle_track(&mut self, b: bool) -> Result<(), Error> { pub fn include_timestamp_subtitle_track(&mut self, b: bool) -> Result<(), Error> {
if b && self.type_ == Type::MediaSegment { if b && matches!(self.type_, Type::MediaSegment { .. }) {
// There's no support today for timestamp truns or for timestamps without edit lists. // There's no support today for timestamp truns or for timestamps without edit lists.
// The latter would invalidate the code's assumption that desired timespan == actual // The latter would invalidate the code's assumption that desired timespan == actual
// timespan in the timestamp track. // timespan in the timestamp track.
@ -992,8 +992,9 @@ impl FileBuilder {
Type::InitSegment => { Type::InitSegment => {
etag.update(b":init:"); etag.update(b":init:");
} }
Type::MediaSegment => { Type::MediaSegment { sequence_number } => {
etag.update(b":media:"); etag.update(b":media:");
etag.update(&sequence_number.to_be_bytes()[..]);
} }
}; };
for s in &mut self.segments { for s in &mut self.segments {
@ -1002,7 +1003,7 @@ impl FileBuilder {
// Add the media time for this segment. If edit lists are supported (not media // Add the media time for this segment. If edit lists are supported (not media
// segments), this shouldn't include the portion they skip. // segments), this shouldn't include the portion they skip.
let start = match self.type_ { let start = match self.type_ {
Type::MediaSegment => s.s.actual_start_90k(), Type::MediaSegment { .. } => s.s.actual_start_90k(),
_ => md.start, _ => md.start,
}; };
self.media_duration_90k += u64::try_from(md.end - start).unwrap(); self.media_duration_90k += u64::try_from(md.end - start).unwrap();
@ -1055,8 +1056,8 @@ impl FileBuilder {
const EST_BUF_LEN: usize = 2048; const EST_BUF_LEN: usize = 2048;
self.body.buf.reserve(EST_BUF_LEN); self.body.buf.reserve(EST_BUF_LEN);
let initial_sample_byte_pos = match self.type_ { let initial_sample_byte_pos = match self.type_ {
Type::MediaSegment => { Type::MediaSegment { sequence_number } => {
self.append_moof()?; self.append_moof(sequence_number)?;
let p = self.append_media_mdat()?; let p = self.append_media_mdat()?;
// If the segment is > 4 GiB, the 32-bit trun data offsets are untrustworthy. // If the segment is > 4 GiB, the 32-bit trun data offsets are untrustworthy.
@ -1240,14 +1241,14 @@ impl FileBuilder {
} }
/// Appends a `MovieFragmentBox` (ISO/IEC 14496-12 section 8.8.4). /// Appends a `MovieFragmentBox` (ISO/IEC 14496-12 section 8.8.4).
fn append_moof(&mut self) -> Result<(), Error> { fn append_moof(&mut self, sequence_number: u32) -> Result<(), Error> {
write_length!(self, { write_length!(self, {
self.body.buf.extend_from_slice(b"moof"); self.body.buf.extend_from_slice(b"moof");
// MovieFragmentHeaderBox (ISO/IEC 14496-12 section 8.8.5). // MovieFragmentHeaderBox (ISO/IEC 14496-12 section 8.8.5).
write_length!(self, { write_length!(self, {
self.body.buf.extend_from_slice(b"mfhd\x00\x00\x00\x00"); self.body.buf.extend_from_slice(b"mfhd\x00\x00\x00\x00");
self.body.append_u32(1); // sequence_number self.body.append_u32(sequence_number);
})?; })?;
// TrackFragmentBox (ISO/IEC 14496-12 section 8.8.6). // TrackFragmentBox (ISO/IEC 14496-12 section 8.8.6).
@ -1908,7 +1909,7 @@ impl http_serve::Entity for File {
if let Some(cd) = self.0.content_disposition.as_ref() { if let Some(cd) = self.0.content_disposition.as_ref() {
hdrs.insert(http::header::CONTENT_DISPOSITION, cd.clone()); hdrs.insert(http::header::CONTENT_DISPOSITION, cd.clone());
} }
if self.0.type_ == Type::MediaSegment { if matches!(self.0.type_, Type::MediaSegment { .. }) {
if let Some((d, r)) = self.0.prev_media_duration_and_cur_runs { if let Some((d, r)) = self.0.prev_media_duration_and_cur_runs {
hdrs.insert( hdrs.insert(
"X-Prev-Media-Duration", "X-Prev-Media-Duration",
@ -2744,7 +2745,7 @@ mod tests {
// Time range [2+4+6, 2+4+6+8+1) means the 4th sample and part of the 5th are included. // Time range [2+4+6, 2+4+6+8+1) means the 4th sample and part of the 5th are included.
// The 3rd gets pulled in also because it's a sync frame and the 4th isn't. // The 3rd gets pulled in also because it's a sync frame and the 4th isn't.
let mp4 = make_mp4_from_encoders( let mp4 = make_mp4_from_encoders(
Type::MediaSegment, Type::MediaSegment { sequence_number: 1 },
&db, &db,
vec![r], vec![r],
2 + 4 + 6..2 + 4 + 6 + 8 + 1, 2 + 4 + 6..2 + 4 + 6 + 8 + 1,
@ -2799,7 +2800,7 @@ mod tests {
for i in 1..6 { for i in 1..6 {
let duration_90k = 2 * i; let duration_90k = 2 * i;
let mp4 = make_mp4_from_encoders( let mp4 = make_mp4_from_encoders(
Type::MediaSegment, Type::MediaSegment { sequence_number: 1 },
&db, &db,
vec![r.clone()], vec![r.clone()],
pos..pos + duration_90k, pos..pos + duration_90k,

View File

@ -162,6 +162,7 @@ impl Service {
// On the first LiveSegment, send all the data from the previous key frame onward. // On the first LiveSegment, send all the data from the previous key frame onward.
// For LiveSegments, it's okay to send a single non-key frame at a time. // For LiveSegments, it's okay to send a single non-key frame at a time.
let mut start_at_key = true; let mut start_at_key = true;
let mut sequence_number = 1;
loop { loop {
let next = combo let next = combo
.next() .next()
@ -169,9 +170,17 @@ impl Service {
.unwrap_or_else(|| unreachable!("timer stream never ends")); .unwrap_or_else(|| unreachable!("timer stream never ends"));
match next { match next {
Either::Left(live) => { Either::Left(live) => {
self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key) self.stream_live_m4s_chunk(
.await?; open_id,
stream_id,
&mut ws,
live,
start_at_key,
sequence_number,
)
.await?;
start_at_key = false; start_at_key = false;
sequence_number = sequence_number.wrapping_add(1);
} }
Either::Right(_) => { Either::Right(_) => {
ws.send(tungstenite::Message::Ping(Vec::new())).await?; ws.send(tungstenite::Message::Ping(Vec::new())).await?;
@ -188,8 +197,9 @@ impl Service {
ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>, ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
live: db::LiveSegment, live: db::LiveSegment,
start_at_key: bool, start_at_key: bool,
sequence_number: u32,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment { sequence_number });
let mut row = None; let mut row = None;
{ {
let db = self.db.lock(); let db = self.db.lock();

View File

@ -254,7 +254,14 @@ impl Service {
), ),
Path::StreamViewMp4Segment(uuid, type_, debug) => ( Path::StreamViewMp4Segment(uuid, type_, debug) => (
CacheControl::PrivateStatic, CacheControl::PrivateStatic,
self.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::MediaSegment, debug)?, self.stream_view_mp4(
&req,
caller,
uuid,
type_,
mp4::Type::MediaSegment { sequence_number: 1 },
debug,
)?,
), ),
Path::StreamLiveMp4Segments(uuid, type_) => ( Path::StreamLiveMp4Segments(uuid, type_) => (
CacheControl::PrivateDynamic, CacheControl::PrivateDynamic,