mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-12-04 23:02:32 -05:00
switch to websocket for live stream (#59)
The multipart stream / hanging GET approach worked in a prototype for a single stream, but Chrome has a per-host limit of six connections. If I try streaming all my cameras at once, I hit that limit. I can't open all the streams, much less additional connections to load init segments and such. Websockets apparently has a much higher limit of 256.
This commit is contained in:
22
src/mp4.rs
22
src/mp4.rs
@@ -1510,6 +1510,28 @@ impl FileInner {
|
||||
#[derive(Clone)]
|
||||
pub struct File(Arc<FileInner>);
|
||||
|
||||
impl File {
|
||||
pub async fn append_into_vec(self, v: &mut Vec<u8>) -> Result<(), Error> {
|
||||
use http_serve::Entity;
|
||||
v.reserve(usize::try_from(self.len())
|
||||
.map_err(|_| format_err_t!(InvalidArgument, "{}-byte mp4 is too big to send over WebSockets!",
|
||||
self.len()))?);
|
||||
let mut b = std::pin::Pin::from(self.get_range(0 .. self.len()));
|
||||
loop {
|
||||
use futures::stream::StreamExt;
|
||||
match b.next().await {
|
||||
Some(r) => {
|
||||
let chunk = r
|
||||
.map_err(failure::Error::from_boxed_compat)
|
||||
.err_kind(ErrorKind::Unknown)?;
|
||||
v.extend_from_slice(chunk.bytes())
|
||||
},
|
||||
None => return Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl http_serve::Entity for File {
|
||||
type Data = Chunk;
|
||||
type Error = BoxedError;
|
||||
|
||||
157
src/web.rs
157
src/web.rs
@@ -29,7 +29,7 @@
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use base::clock::Clocks;
|
||||
use base::{ErrorKind, ResultExt, bail_t, strutil};
|
||||
use base::{ErrorKind, bail_t, strutil};
|
||||
use bytes::Bytes;
|
||||
use crate::body::{Body, BoxedError};
|
||||
use crate::json;
|
||||
@@ -42,8 +42,9 @@ use db::{auth, recording};
|
||||
use db::dir::SampleFileDir;
|
||||
use failure::{Error, bail, format_err};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::sink::SinkExt;
|
||||
use futures::future::{self, Future, TryFutureExt};
|
||||
use futures::stream::{Stream, StreamExt, TryStreamExt};
|
||||
use futures::stream::StreamExt;
|
||||
use http::{Request, Response, status::StatusCode};
|
||||
use http_serve;
|
||||
use http::header::{self, HeaderValue};
|
||||
@@ -59,6 +60,7 @@ use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use tokio_tungstenite::tungstenite;
|
||||
use url::form_urlencoded;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -878,11 +880,12 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
fn stream_live_m4s(&self, _req: &Request<::hyper::Body>, caller: Caller, uuid: Uuid,
|
||||
fn stream_live_m4s(&self, req: Request<::hyper::Body>, caller: Caller, uuid: Uuid,
|
||||
stream_type: db::StreamType) -> ResponseResult {
|
||||
if !caller.permissions.view_video {
|
||||
return Err(plain_response(StatusCode::UNAUTHORIZED, "view_video required"));
|
||||
}
|
||||
|
||||
let stream_id;
|
||||
let open_id;
|
||||
let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded();
|
||||
@@ -904,65 +907,93 @@ impl Service {
|
||||
db.watch_live(stream_id, Box::new(move |l| sub_tx.unbounded_send(l).is_ok()))
|
||||
.expect("stream_id refed by camera");
|
||||
}
|
||||
let inner = self.0.clone();
|
||||
let body = sub_rx
|
||||
.map(move |live| -> Result<_, base::Error> {
|
||||
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);
|
||||
let mut vse_id = None;
|
||||
{
|
||||
let db = inner.db.lock();
|
||||
let mut rows = 0;
|
||||
db.list_recordings_by_id(stream_id, live.recording .. live.recording+1,
|
||||
&mut |r| {
|
||||
rows += 1;
|
||||
let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id)
|
||||
.unwrap();
|
||||
vse_id = Some(strutil::hex(&vse.sha1));
|
||||
builder.append(&db, r, live.off_90k.clone())?;
|
||||
Ok(())
|
||||
}).err_kind(base::ErrorKind::Unknown)?;
|
||||
if rows != 1 {
|
||||
bail_t!(Internal, "unable to find {:?}", live);
|
||||
}
|
||||
}
|
||||
let vse_id = vse_id.unwrap();
|
||||
use http_serve::Entity;
|
||||
let mp4 = builder.build(inner.db.clone(), inner.dirs_by_stream_id.clone())?;
|
||||
let mut hdrs = http::header::HeaderMap::new();
|
||||
mp4.add_headers(&mut hdrs);
|
||||
let mime_type = hdrs.get(http::header::CONTENT_TYPE).unwrap();
|
||||
let len = mp4.len();
|
||||
use futures::stream::once;
|
||||
let hdr = format!(
|
||||
"--B\r\n\
|
||||
Content-Length: {}\r\n\
|
||||
Content-Type: {}\r\n\
|
||||
X-Recording-Id: {}\r\n\
|
||||
X-Time-Range: {}-{}\r\n\
|
||||
X-Video-Sample-Entry-Sha1: {}\r\n\r\n",
|
||||
len,
|
||||
mime_type.to_str().unwrap(),
|
||||
live.recording,
|
||||
live.off_90k.start,
|
||||
live.off_90k.end,
|
||||
&vse_id);
|
||||
let v: Vec<Pin<crate::body::BodyStream>> = vec![
|
||||
Box::pin(once(futures::future::ok(hdr.into()))),
|
||||
Pin::from(mp4.get_range(0 .. len)),
|
||||
Box::pin(once(futures::future::ok("\r\n\r\n".into())))
|
||||
];
|
||||
Ok(futures::stream::iter(v).flatten())
|
||||
});
|
||||
let body = body.map_err::<BoxedError, _>(|e| Box::new(e.compat()));
|
||||
let _: &dyn Stream<Item = Result<_, BoxedError>> = &body;
|
||||
let body = body.try_flatten();
|
||||
let body: crate::body::BodyStream = Box::new(body);
|
||||
let body: Body = body.into();
|
||||
Ok(http::Response::builder()
|
||||
.header("X-Open-Id", open_id.to_string())
|
||||
.header("Content-Type", "multipart/mixed; boundary=B")
|
||||
.body(body)
|
||||
.unwrap())
|
||||
|
||||
let (parts, body) = req.into_parts();
|
||||
let req = Request::from_parts(parts, ());
|
||||
let response = tungstenite::handshake::server::create_response(&req)
|
||||
.map_err(|e| bad_req(e.to_string()))?;
|
||||
let (parts, ()) = response.into_parts();
|
||||
|
||||
tokio::spawn(self.clone().stream_live_m4s_ws(stream_id, open_id, body, sub_rx));
|
||||
|
||||
Ok(Response::from_parts(parts, Body::from("")))
|
||||
}
|
||||
|
||||
async fn stream_live_m4s_ws(
|
||||
self, stream_id: i32, open_id: u32, body: hyper::Body,
|
||||
mut sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>) {
|
||||
let upgraded = match body.on_upgrade().await {
|
||||
Ok(u) => u,
|
||||
Err(e) => {
|
||||
warn!("Unable to upgrade stream to websocket: {}", e);
|
||||
return;
|
||||
},
|
||||
};
|
||||
let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
|
||||
upgraded,
|
||||
tungstenite::protocol::Role::Server,
|
||||
None,
|
||||
).await;
|
||||
loop {
|
||||
let live = match sub_rx.next().await {
|
||||
Some(l) => l,
|
||||
None => return,
|
||||
};
|
||||
if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await {
|
||||
info!("Dropping WebSocket after error: {}", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_live_m4s_chunk(
|
||||
&self, open_id: u32, stream_id: i32,
|
||||
ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
|
||||
live: db::LiveSegment) -> Result<(), Error> {
|
||||
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);
|
||||
let mut vse_id = None;
|
||||
let mut start = None;
|
||||
{
|
||||
let db = self.0.db.lock();
|
||||
let mut rows = 0;
|
||||
db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| {
|
||||
rows += 1;
|
||||
let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id)
|
||||
.unwrap();
|
||||
vse_id = Some(strutil::hex(&vse.sha1));
|
||||
start = Some(r.start);
|
||||
builder.append(&db, r, live.off_90k.clone())?;
|
||||
Ok(())
|
||||
})?;
|
||||
if rows != 1 {
|
||||
bail_t!(Internal, "unable to find {:?}", live);
|
||||
}
|
||||
}
|
||||
let vse_id = vse_id.unwrap();
|
||||
let start = start.unwrap();
|
||||
use http_serve::Entity;
|
||||
let mp4 = builder.build(self.0.db.clone(), self.0.dirs_by_stream_id.clone())?;
|
||||
let mut hdrs = http::header::HeaderMap::new();
|
||||
mp4.add_headers(&mut hdrs);
|
||||
let mime_type = hdrs.get(http::header::CONTENT_TYPE).unwrap();
|
||||
let hdr = format!(
|
||||
"Content-Type: {}\r\n\
|
||||
X-Recording-Start: {}\r\n\
|
||||
X-Recording-Id: {}.{}\r\n\
|
||||
X-Time-Range: {}-{}\r\n\
|
||||
X-Video-Sample-Entry-Sha1: {}\r\n\r\n",
|
||||
mime_type.to_str().unwrap(),
|
||||
start.0,
|
||||
open_id,
|
||||
live.recording,
|
||||
live.off_90k.start,
|
||||
live.off_90k.end,
|
||||
&vse_id);
|
||||
let mut v = /*Pin::from(*/hdr.into_bytes()/*)*/;
|
||||
mp4.append_into_vec(&mut v).await?;
|
||||
//let v = Pin::into_inner();
|
||||
ws.send(tungstenite::Message::Binary(v)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn signals(&self, req: Request<hyper::Body>, caller: Caller)
|
||||
@@ -1023,7 +1054,7 @@ impl Service {
|
||||
mp4::Type::MediaSegment, debug))
|
||||
},
|
||||
Path::StreamLiveMp4Segments(uuid, type_) => {
|
||||
wrap_r(true, self.stream_live_m4s(&req, caller, uuid, type_))
|
||||
wrap_r(true, self.stream_live_m4s(req, caller, uuid, type_))
|
||||
},
|
||||
Path::NotFound => wrap(true, future::err(not_found("path not understood"))),
|
||||
Path::Login => wrap(true, with_json_body(req).and_then({
|
||||
|
||||
Reference in New Issue
Block a user