diff --git a/server/src/web/live.rs b/server/src/web/live.rs new file mode 100644 index 0000000..cec7a54 --- /dev/null +++ b/server/src/web/live.rs @@ -0,0 +1,192 @@ +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. +// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. + +//! Live video websocket handling. + +use std::sync::Arc; + +use crate::body::Body; +use base::{bail_t, format_err_t}; +use failure::Error; +use futures::{future::Either, SinkExt, StreamExt}; +use http::{header, Request, Response, StatusCode}; +use log::{info, warn}; +use tokio_tungstenite::tungstenite; +use uuid::Uuid; + +use crate::{mp4, web::plain_response}; + +use super::{bad_req, Caller, ResponseResult, Service}; + +impl Service { + pub(super) fn stream_live_m4s( + self: Arc, + req: Request<::hyper::Body>, + caller: Caller, + uuid: Uuid, + stream_type: db::StreamType, + ) -> ResponseResult { + if !caller.permissions.view_video { + bail_t!(PermissionDenied, "view_video required"); + } + + let stream_id; + let open_id; + let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded(); + { + let mut db = self.db.lock(); + open_id = match db.open { + None => { + bail_t!( + FailedPrecondition, + "database is read-only; there are no live streams" + ); + } + Some(o) => o.id, + }; + let camera = db.get_camera(uuid).ok_or_else(|| { + plain_response(StatusCode::NOT_FOUND, format!("no such camera {}", uuid)) + })?; + stream_id = camera.streams[stream_type.index()].ok_or_else(|| { + format_err_t!(NotFound, "no such stream {}/{}", uuid, stream_type) + })?; + db.watch_live( + stream_id, + Box::new(move |l| sub_tx.unbounded_send(l).is_ok()), + ) + .expect("stream_id refed by camera"); + } + + let response = + tungstenite::handshake::server::create_response_with_body(&req, hyper::Body::empty) + .map_err(|e| bad_req(e.to_string()))?; + let (parts, _) = response.into_parts(); + + tokio::spawn(self.stream_live_m4s_ws(stream_id, open_id, req, sub_rx)); + + Ok(Response::from_parts(parts, Body::from(""))) + } + + async fn stream_live_m4s_ws( + self: Arc, + stream_id: i32, + open_id: u32, + req: hyper::Request, + sub_rx: futures::channel::mpsc::UnboundedReceiver, + ) { + let upgraded = match hyper::upgrade::on(req).await { + Ok(u) => u, + Err(e) => { + warn!("Unable to upgrade stream to websocket: {}", e); + return; + } + }; + let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ) + .await; + + if let Err(e) = self + .stream_live_m4s_ws_loop(stream_id, open_id, sub_rx, ws) + .await + { + info!("Dropping WebSocket after error: {}", e); + } + } + + /// Helper for `stream_live_m4s_ws` that returns error when the stream is dropped. + /// The outer function logs the error. + async fn stream_live_m4s_ws_loop( + self: Arc, + stream_id: i32, + open_id: u32, + sub_rx: futures::channel::mpsc::UnboundedReceiver, + mut ws: tokio_tungstenite::WebSocketStream, + ) -> Result<(), Error> { + let keepalive = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( + std::time::Duration::new(30, 0), + )); + let mut combo = futures::stream::select( + sub_rx.map(Either::Left), + keepalive.map(|_| Either::Right(())), + ); + + // On the first LiveSegment, send all the data from the previous key frame onward. + // For LiveSegments, it's okay to send a single non-key frame at a time. + let mut start_at_key = true; + loop { + let next = combo + .next() + .await + .unwrap_or_else(|| unreachable!("timer stream never ends")); + match next { + Either::Left(live) => { + self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key) + .await?; + start_at_key = false; + } + Either::Right(_) => { + ws.send(tungstenite::Message::Ping(Vec::new())).await?; + } + } + } + } + + /// Sends a single live segment chunk of a `live.m4s` stream. + async fn stream_live_m4s_chunk( + &self, + open_id: u32, + stream_id: i32, + ws: &mut tokio_tungstenite::WebSocketStream, + live: db::LiveSegment, + start_at_key: bool, + ) -> Result<(), Error> { + let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); + let mut row = None; + { + let db = self.db.lock(); + let mut rows = 0; + db.list_recordings_by_id(stream_id, live.recording..live.recording + 1, &mut |r| { + rows += 1; + row = Some(r); + builder.append(&db, r, live.media_off_90k.clone(), start_at_key)?; + Ok(()) + })?; + if rows != 1 { + bail_t!(Internal, "unable to find {:?}", live); + } + } + let row = row.unwrap(); + use http_serve::Entity; + let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?; + let mut hdrs = header::HeaderMap::new(); + mp4.add_headers(&mut hdrs); + let mime_type = hdrs.get(header::CONTENT_TYPE).unwrap(); + let (prev_media_duration, prev_runs) = row.prev_media_duration_and_runs.unwrap(); + let hdr = format!( + "Content-Type: {}\r\n\ + X-Recording-Start: {}\r\n\ + X-Recording-Id: {}.{}\r\n\ + X-Media-Time-Range: {}-{}\r\n\ + X-Prev-Media-Duration: {}\r\n\ + X-Runs: {}\r\n\ + X-Video-Sample-Entry-Id: {}\r\n\r\n", + mime_type.to_str().unwrap(), + row.start.0, + open_id, + live.recording, + live.media_off_90k.start, + live.media_off_90k.end, + prev_media_duration.0, + prev_runs + if row.run_offset == 0 { 1 } else { 0 }, + &row.video_sample_entry_id + ); + let mut v = hdr.into_bytes(); + mp4.append_into_vec(&mut v).await?; + ws.send(tungstenite::Message::Binary(v)).await?; + Ok(()) + } +} diff --git a/server/src/web/mod.rs b/server/src/web/mod.rs index 9c4e3f3..996ca87 100644 --- a/server/src/web/mod.rs +++ b/server/src/web/mod.rs @@ -2,6 +2,7 @@ // Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. +mod live; mod path; mod static_file; @@ -17,8 +18,6 @@ use db::dir::SampleFileDir; use db::{auth, recording}; use failure::{format_err, Error}; use fnv::FnvHashMap; -use futures::stream::StreamExt; -use futures::{future::Either, sink::SinkExt}; use http::header::{self, HeaderValue}; use http::method::Method; use http::{status::StatusCode, Request, Response}; @@ -35,7 +34,6 @@ use std::convert::TryFrom; use std::net::IpAddr; use std::ops::Range; use std::sync::Arc; -use tokio_tungstenite::tungstenite; use url::form_urlencoded; use uuid::Uuid; @@ -296,176 +294,6 @@ impl Service { }) } - fn stream_live_m4s( - self: Arc, - req: Request<::hyper::Body>, - caller: Caller, - uuid: Uuid, - stream_type: db::StreamType, - ) -> ResponseResult { - if !caller.permissions.view_video { - bail_t!(PermissionDenied, "view_video required"); - } - - let stream_id; - let open_id; - let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded(); - { - let mut db = self.db.lock(); - open_id = match db.open { - None => { - bail_t!( - FailedPrecondition, - "database is read-only; there are no live streams" - ); - } - Some(o) => o.id, - }; - let camera = db.get_camera(uuid).ok_or_else(|| { - plain_response(StatusCode::NOT_FOUND, format!("no such camera {}", uuid)) - })?; - stream_id = camera.streams[stream_type.index()].ok_or_else(|| { - format_err_t!(NotFound, "no such stream {}/{}", uuid, stream_type) - })?; - db.watch_live( - stream_id, - Box::new(move |l| sub_tx.unbounded_send(l).is_ok()), - ) - .expect("stream_id refed by camera"); - } - - let response = - tungstenite::handshake::server::create_response_with_body(&req, hyper::Body::empty) - .map_err(|e| bad_req(e.to_string()))?; - let (parts, _) = response.into_parts(); - - tokio::spawn(self.stream_live_m4s_ws(stream_id, open_id, req, sub_rx)); - - Ok(Response::from_parts(parts, Body::from(""))) - } - - async fn stream_live_m4s_ws( - self: Arc, - stream_id: i32, - open_id: u32, - req: hyper::Request, - sub_rx: futures::channel::mpsc::UnboundedReceiver, - ) { - let upgraded = match hyper::upgrade::on(req).await { - Ok(u) => u, - Err(e) => { - warn!("Unable to upgrade stream to websocket: {}", e); - return; - } - }; - let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ) - .await; - - if let Err(e) = self - .stream_live_m4s_ws_loop(stream_id, open_id, sub_rx, ws) - .await - { - info!("Dropping WebSocket after error: {}", e); - } - } - - /// Helper for `stream_live_m4s_ws` that returns error when the stream is dropped. - /// The outer function logs the error. - async fn stream_live_m4s_ws_loop( - self: Arc, - stream_id: i32, - open_id: u32, - sub_rx: futures::channel::mpsc::UnboundedReceiver, - mut ws: tokio_tungstenite::WebSocketStream, - ) -> Result<(), Error> { - let keepalive = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( - std::time::Duration::new(30, 0), - )); - let mut combo = futures::stream::select( - sub_rx.map(Either::Left), - keepalive.map(|_| Either::Right(())), - ); - - // On the first LiveSegment, send all the data from the previous key frame onward. - // For LiveSegments, it's okay to send a single non-key frame at a time. - let mut start_at_key = true; - loop { - let next = combo - .next() - .await - .unwrap_or_else(|| unreachable!("timer stream never ends")); - match next { - Either::Left(live) => { - self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key) - .await?; - start_at_key = false; - } - Either::Right(_) => { - ws.send(tungstenite::Message::Ping(Vec::new())).await?; - } - } - } - } - - /// Sends a single live segment chunk of a `live.m4s` stream. - async fn stream_live_m4s_chunk( - &self, - open_id: u32, - stream_id: i32, - ws: &mut tokio_tungstenite::WebSocketStream, - live: db::LiveSegment, - start_at_key: bool, - ) -> Result<(), Error> { - let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); - let mut row = None; - { - let db = self.db.lock(); - let mut rows = 0; - db.list_recordings_by_id(stream_id, live.recording..live.recording + 1, &mut |r| { - rows += 1; - row = Some(r); - builder.append(&db, r, live.media_off_90k.clone(), start_at_key)?; - Ok(()) - })?; - if rows != 1 { - bail_t!(Internal, "unable to find {:?}", live); - } - } - let row = row.unwrap(); - use http_serve::Entity; - let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?; - let mut hdrs = header::HeaderMap::new(); - mp4.add_headers(&mut hdrs); - let mime_type = hdrs.get(header::CONTENT_TYPE).unwrap(); - let (prev_media_duration, prev_runs) = row.prev_media_duration_and_runs.unwrap(); - let hdr = format!( - "Content-Type: {}\r\n\ - X-Recording-Start: {}\r\n\ - X-Recording-Id: {}.{}\r\n\ - X-Media-Time-Range: {}-{}\r\n\ - X-Prev-Media-Duration: {}\r\n\ - X-Runs: {}\r\n\ - X-Video-Sample-Entry-Id: {}\r\n\r\n", - mime_type.to_str().unwrap(), - row.start.0, - open_id, - live.recording, - live.media_off_90k.start, - live.media_off_90k.end, - prev_media_duration.0, - prev_runs + if row.run_offset == 0 { 1 } else { 0 }, - &row.video_sample_entry_id - ); - let mut v = hdr.into_bytes(); - mp4.append_into_vec(&mut v).await?; - ws.send(tungstenite::Message::Binary(v)).await?; - Ok(()) - } - async fn signals(&self, req: Request, caller: Caller) -> ResponseResult { match *req.method() { Method::POST => self.post_signals(req, caller).await,