// This file is part of Moonfire NVR, a security camera network video recorder. // Copyright (C) 2016 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. use crate::h264; use bytes::Buf; use cstr::cstr; use failure::format_err; use failure::{bail, Error}; use futures::StreamExt; use lazy_static::lazy_static; use log::warn; use retina::client::{Credentials, Playing, Session}; use retina::codec::{CodecItem, VideoParameters}; use std::convert::TryFrom; use std::ffi::CString; use std::num::NonZeroU32; use std::result::Result; use url::Url; static START_FFMPEG: parking_lot::Once = parking_lot::Once::new(); lazy_static! { pub static ref FFMPEG: Ffmpeg = Ffmpeg::new(); } pub enum RtspLibrary { Ffmpeg, Retina, } impl std::str::FromStr for RtspLibrary { type Err = Error; fn from_str(s: &str) -> Result { Ok(match s { "ffmpeg" => RtspLibrary::Ffmpeg, "retina" => RtspLibrary::Retina, _ => bail!("unknown RTSP library {:?}", s), }) } } impl RtspLibrary { pub fn opener(&self) -> &'static dyn Opener { match self { RtspLibrary::Ffmpeg => &*FFMPEG, RtspLibrary::Retina => &RETINA, } } } #[cfg(test)] pub enum Source<'a> { /// A filename, for testing. File(&'a str), /// An RTSP stream, for production use. Rtsp { url: Url, username: Option, password: Option, }, } #[cfg(not(test))] pub enum Source { /// An RTSP stream, for production use. Rtsp { url: Url, username: Option, password: Option, }, } pub trait Opener: Send + Sync { fn open(&self, src: Source) -> Result<(h264::ExtraData, Box), Error>; } pub struct VideoFrame<'a> { pub pts: i64, /// An estimate of the duration of the frame, or zero. /// This can be deceptive and is only used by some testing code. pub duration: i32, pub is_key: bool, pub data: &'a [u8], } pub trait Stream: Send { fn next(&mut self) -> Result; } pub struct Ffmpeg {} impl Ffmpeg { fn new() -> Ffmpeg { START_FFMPEG.call_once(|| { ffmpeg::Ffmpeg::new(); }); Ffmpeg {} } } impl Opener for Ffmpeg { fn open(&self, src: Source) -> Result<(h264::ExtraData, Box), Error> { use ffmpeg::avformat::InputFormatContext; let mut input = match src { #[cfg(test)] Source::File(filename) => { let mut open_options = ffmpeg::avutil::Dictionary::new(); // Work around https://github.com/scottlamb/moonfire-nvr/issues/10 open_options .set(cstr!("advanced_editlist"), cstr!("false")) .unwrap(); let url = format!("file:{}", filename); let i = InputFormatContext::open( &CString::new(url.clone()).unwrap(), &mut open_options, )?; if !open_options.empty() { warn!( "While opening URL {}, some options were not understood: {}", url, open_options ); } i } Source::Rtsp { url, username, password, } => { let mut open_options = ffmpeg::avutil::Dictionary::new(); open_options .set(cstr!("rtsp_transport"), cstr!("tcp")) .unwrap(); open_options .set(cstr!("user-agent"), cstr!("moonfire-nvr")) .unwrap(); // 10-second socket timeout, in microseconds. open_options .set(cstr!("stimeout"), cstr!("10000000")) .unwrap(); // Without this option, the first packet has an incorrect pts. // https://trac.ffmpeg.org/ticket/5018 open_options .set(cstr!("fflags"), cstr!("nobuffer")) .unwrap(); // Moonfire NVR currently only supports video, so receiving audio is wasteful. // It also triggers . open_options .set(cstr!("allowed_media_types"), cstr!("video")) .unwrap(); let mut url_with_credentials = url.clone(); if let Some(u) = username.as_deref() { url_with_credentials .set_username(u) .map_err(|_| format_err!("unable to set username on url {}", url))?; } url_with_credentials .set_password(password.as_deref()) .map_err(|_| format_err!("unable to set password on url {}", url))?; let i = InputFormatContext::open( &CString::new(url_with_credentials.as_str())?, &mut open_options, )?; if !open_options.empty() { warn!( "While opening URL {}, some options were not understood: {}", url, open_options ); } i } }; input.find_stream_info()?; // Find the video stream. let mut video_i = None; { let s = input.streams(); for i in 0..s.len() { if s.get(i).codecpar().codec_type().is_video() { video_i = Some(i); break; } } } let video_i = match video_i { Some(i) => i, None => bail!("no video stream"), }; let video = input.streams().get(video_i); let codec = video.codecpar(); let codec_id = codec.codec_id(); if !codec_id.is_h264() { bail!("stream's video codec {:?} is not h264", codec_id); } let tb = video.time_base(); if tb.num != 1 || tb.den != 90000 { bail!( "video stream has timebase {}/{}; expected 1/90000", tb.num, tb.den ); } let dims = codec.dims(); let extra_data = h264::ExtraData::parse( codec.extradata(), u16::try_from(dims.width)?, u16::try_from(dims.height)?, )?; let need_transform = extra_data.need_transform; let stream = Box::new(FfmpegStream { input, video_i, data: Vec::new(), need_transform, }); Ok((extra_data, stream)) } } struct FfmpegStream { input: ffmpeg::avformat::InputFormatContext<'static>, video_i: usize, data: Vec, need_transform: bool, } impl Stream for FfmpegStream { fn next(&mut self) -> Result { let pkt = loop { let pkt = self.input.read_frame()?; if pkt.stream_index() == self.video_i { break pkt; } }; let data = pkt .data() .ok_or_else(|| format_err!("packet with no data"))?; if self.need_transform { h264::transform_sample_data(data, &mut self.data)?; } else { // This copy isn't strictly necessary, but this path is only taken in testing anyway. self.data.clear(); self.data.extend_from_slice(data); } let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?; Ok(VideoFrame { pts, is_key: pkt.is_key(), duration: pkt.duration(), data: &self.data, }) } } pub struct RetinaOpener {} pub const RETINA: RetinaOpener = RetinaOpener {}; impl Opener for RetinaOpener { fn open(&self, src: Source) -> Result<(h264::ExtraData, Box), Error> { let (startup_tx, startup_rx) = tokio::sync::oneshot::channel(); let (frame_tx, frame_rx) = tokio::sync::mpsc::channel(1); let handle = tokio::runtime::Handle::current(); let (url, username, password) = match src { #[cfg(test)] Source::File(_) => bail!("Retina doesn't support .mp4 files"), Source::Rtsp { url, username, password, } => (url, username, password), }; let creds = match (username, password) { (None, None) => None, (Some(username), Some(password)) => Some(Credentials { username, password }), _ => bail!("expected username and password together"), }; // TODO: connection timeout. handle.spawn(async move { let (session, mut video_params) = match RetinaOpener::play(url, creds).await { Err(e) => { let _ = startup_tx.send(Err(e)); return; } Ok((s, v)) => (s, v), }; let session = match session.demuxed() { Ok(s) => s, Err(e) => { let _ = startup_tx.send(Err(e)); return; } }; tokio::pin!(session); // First frame. loop { match session.next().await { Some(Err(e)) => { let _ = startup_tx.send(Err(e)); return; } Some(Ok(CodecItem::VideoFrame(mut v))) => { if let Some(v) = v.new_parameters.take() { video_params = v; } if v.is_random_access_point { if startup_tx.send(Ok(video_params)).is_err() { return; } if frame_tx.send(Ok(v)).await.is_err() { return; } break; } } Some(Ok(_)) => {} None => { let _ = startup_tx.send(Err(format_err!("stream closed before first frame"))); return; } } } // Following frames. let mut need_key_frame = false; while let Some(item) = session.next().await { match item { Err(e) => { let _ = frame_tx.send(Err(e)).await; return; } Ok(CodecItem::VideoFrame(v)) => { if v.loss > 0 { if !v.is_random_access_point { log::info!( "lost {} RTP packets; waiting for next key frame @ {:?}", v.loss, v.start_ctx() ); need_key_frame = true; continue; } else { log::info!( "lost {} RTP packets; already have key frame @ {:?}", v.loss, v.start_ctx() ); need_key_frame = false; } } else if need_key_frame && !v.is_random_access_point { continue; } else if need_key_frame { log::info!("recovering from loss with key frame @ {:?}", v.start_ctx()); need_key_frame = false; } if frame_tx.send(Ok(v)).await.is_err() { return; // other end died. } } _ => {} } } }); let video_params = handle.block_on(startup_rx)??; let dims = video_params.pixel_dimensions(); let extra_data = h264::ExtraData::parse( video_params.extra_data(), u16::try_from(dims.0)?, u16::try_from(dims.1)?, )?; let stream = Box::new(RetinaStream { frame_rx, data: Vec::new(), }); Ok((extra_data, stream)) } } impl RetinaOpener { async fn play( url: Url, creds: Option, ) -> Result<(Session, VideoParameters), Error> { let mut session = retina::client::Session::describe(url, creds).await?; let (video_i, video_params) = session .streams() .iter() .enumerate() .find_map(|(i, s)| match s.parameters() { Some(retina::codec::Parameters::Video(v)) => Some((i, v.clone())), _ => None, }) .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; session.setup(video_i).await?; let session = session .play( retina::client::PlayPolicy::default() .enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()), ) .await?; Ok((session, video_params)) } } struct RetinaStream { frame_rx: tokio::sync::mpsc::Receiver>, data: Vec, } impl Stream for RetinaStream { fn next(&mut self) -> Result { let mut frame = self .frame_rx .blocking_recv() .ok_or_else(|| format_err!("stream ended"))??; self.data.clear(); while frame.has_remaining() { let chunk = frame.chunk(); self.data.extend_from_slice(chunk); let len = chunk.len(); frame.advance(len); } Ok(VideoFrame { pts: frame.timestamp.elapsed(), duration: 0, is_key: frame.is_random_access_point, data: &self.data, }) } }