// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2016 The Moonfire NVR Authors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
use base::clock::Clocks;
use base::{ErrorKind, bail_t, strutil};
use bytes::Bytes;
use crate::body::{Body, BoxedError};
use crate::json;
use crate::mp4;
use bytes::{BufMut, BytesMut};
use core::borrow::Borrow;
use core::str::FromStr;
use db::{auth, recording};
use db::dir::SampleFileDir;
use failure::{Error, bail, format_err};
use fnv::FnvHashMap;
use futures::sink::SinkExt;
use futures::future::{self, Either, Future, TryFutureExt, err};
use futures::stream::StreamExt;
use http::{Request, Response, status::StatusCode};
use http::header::{self, HeaderValue};
use http_serve::dir::FsDir;
use log::{debug, info, warn};
use memchr::memchr;
use nom::IResult;
use nom::bytes::complete::{take_while1, tag};
use nom::combinator::{all_consuming, map, map_res, opt};
use nom::sequence::{preceded, tuple};
use std::cmp;
use std::net::IpAddr;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use tokio_tungstenite::tungstenite;
use url::form_urlencoded;
use uuid::Uuid;
type BoxedFuture = Box, BoxedError>> +
Sync + Send + 'static>;
#[derive(Debug, Eq, PartialEq)]
enum Path {
TopLevel, // "/api/"
Request, // "/api/request"
InitSegment([u8; 20], bool), // "/api/init/.mp4{.txt}"
Camera(Uuid), // "/api/cameras//"
Signals, // "/api/signals"
StreamRecordings(Uuid, db::StreamType), // "/api/cameras///recordings"
StreamViewMp4(Uuid, db::StreamType, bool), // "/api/cameras///view.mp4{.txt}"
StreamViewMp4Segment(Uuid, db::StreamType, bool), // "/api/cameras///view.m4s{.txt}"
StreamLiveMp4Segments(Uuid, db::StreamType), // "/api/cameras///live.m4s"
Login, // "/api/login"
Logout, // "/api/logout"
Static, // (anything that doesn't start with "/api/")
NotFound,
}
impl Path {
fn decode(path: &str) -> Self {
if !path.starts_with("/api/") {
return Path::Static;
}
let path = &path["/api".len()..];
if path == "/" {
return Path::TopLevel;
}
match path {
"/login" => return Path::Login,
"/logout" => return Path::Logout,
"/request" => return Path::Request,
"/signals" => return Path::Signals,
_ => {},
};
if path.starts_with("/init/") {
let (debug, path) = if path.ends_with(".txt") {
(true, &path[0 .. path.len() - 4])
} else {
(false, path)
};
if path.len() != 50 || !path.ends_with(".mp4") {
return Path::NotFound;
}
if let Ok(sha1) = strutil::dehex(&path.as_bytes()[6..46]) {
return Path::InitSegment(sha1, debug);
}
return Path::NotFound;
}
if !path.starts_with("/cameras/") {
return Path::NotFound;
}
let path = &path["/cameras/".len()..];
let slash = match path.find('/') {
None => { return Path::NotFound; },
Some(s) => s,
};
let uuid = &path[0 .. slash];
let path = &path[slash+1 .. ];
// TODO(slamb): require uuid to be in canonical format.
let uuid = match Uuid::parse_str(uuid) {
Ok(u) => u,
Err(_) => { return Path::NotFound },
};
if path.is_empty() {
return Path::Camera(uuid);
}
let slash = match path.find('/') {
None => { return Path::NotFound; },
Some(s) => s,
};
let (type_, path) = path.split_at(slash);
let type_ = match db::StreamType::parse(type_) {
None => { return Path::NotFound; },
Some(t) => t,
};
match path {
"/recordings" => Path::StreamRecordings(uuid, type_),
"/view.mp4" => Path::StreamViewMp4(uuid, type_, false),
"/view.mp4.txt" => Path::StreamViewMp4(uuid, type_, true),
"/view.m4s" => Path::StreamViewMp4Segment(uuid, type_, false),
"/view.m4s.txt" => Path::StreamViewMp4Segment(uuid, type_, true),
"/live.m4s" => Path::StreamLiveMp4Segments(uuid, type_),
_ => Path::NotFound,
}
}
}
fn plain_response>(status: http::StatusCode, body: B) -> Response {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"))
.body(body.into()).expect("hardcoded head should be valid")
}
fn not_found>(body: B) -> Response {
plain_response(StatusCode::NOT_FOUND, body)
}
fn bad_req>(body: B) -> Response {
plain_response(StatusCode::BAD_REQUEST, body)
}
fn internal_server_err>(err: E) -> Response {
plain_response(StatusCode::INTERNAL_SERVER_ERROR, err.into().to_string())
}
fn from_base_error(err: base::Error) -> Response {
let status_code = match err.kind() {
ErrorKind::PermissionDenied | ErrorKind::Unauthenticated => StatusCode::UNAUTHORIZED,
ErrorKind::InvalidArgument => StatusCode::BAD_REQUEST,
ErrorKind::NotFound => StatusCode::NOT_FOUND,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
plain_response(status_code, err.to_string())
}
#[derive(Debug, Eq, PartialEq)]
struct Segments {
ids: Range,
open_id: Option,
start_time: i64,
end_time: Option,
}
fn num<'a, T: FromStr>() -> impl Fn(&'a str) -> IResult<&'a str, T> {
map_res(take_while1(|c: char| c.is_ascii_digit()), FromStr::from_str)
}
impl Segments {
/// Parses the `s` query parameter to `view.mp4` as described in `design/api.md`.
/// Doesn't do any validation.
fn parse(i: &str) -> IResult<&str, Segments> {
// Parse START_ID[-END_ID] into Range.
// Note that END_ID is inclusive, but Ranges are half-open.
let (i, ids) = map(tuple((num::(), opt(preceded(tag("-"), num::())))),
|(start, end)| start .. end.unwrap_or(start) + 1)(i)?;
// Parse [@OPEN_ID] into Option.
let (i, open_id) = opt(preceded(tag("@"), num::()))(i)?;
// Parse [.[REL_START_TIME]-[REL_END_TIME]] into (i64, Option).
let (i, (start_time, end_time)) = map(
opt(preceded(tag("."), tuple((opt(num::()), tag("-"), opt(num::()))))),
|t| {
t.map(|(s, _, e)| (s.unwrap_or(0), e))
.unwrap_or((0, None))
})(i)?;
Ok((i, Segments { ids, open_id, start_time, end_time, }))
}
}
impl FromStr for Segments {
type Err = ();
fn from_str(s: &str) -> Result {
let (_, s) = all_consuming(Segments::parse)(s).map_err(|_| ())?;
if s.ids.end <= s.ids.start {
return Err(());
}
if let Some(e) = s.end_time {
if e < s.start_time {
return Err(());
}
}
Ok(s)
}
}
struct Caller {
permissions: db::Permissions,
session: Option,
}
impl Caller {
}
struct ServiceInner {
db: Arc,
ui_dir: Option>,
dirs_by_stream_id: Arc>>,
time_zone_name: String,
allow_unauthenticated_permissions: Option,
trust_forward_hdrs: bool,
}
type ResponseResult = Result, Response>;
fn serve_json(req: &Request, out: &T) -> ResponseResult {
let (mut resp, writer) = http_serve::streaming_body(&req).build();
resp.headers_mut().insert(header::CONTENT_TYPE,
HeaderValue::from_static("application/json"));
if let Some(mut w) = writer {
serde_json::to_writer(&mut w, out).map_err(internal_server_err)?;
}
Ok(resp)
}
impl ServiceInner {
fn top_level(&self, req: &Request<::hyper::Body>, caller: Caller) -> ResponseResult {
let mut days = false;
let mut camera_configs = false;
if let Some(q) = req.uri().query() {
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
let (key, value): (_, &str) = (key.borrow(), value.borrow());
match key {
"days" => days = value == "true",
"cameraConfigs" => camera_configs = value == "true",
_ => {},
};
}
}
if camera_configs {
if !caller.permissions.read_camera_configs {
return Err(plain_response(StatusCode::UNAUTHORIZED,
"read_camera_configs required"));
}
}
let db = self.db.lock();
serve_json(req, &json::TopLevel {
time_zone_name: &self.time_zone_name,
cameras: (&db, days, camera_configs),
session: caller.session,
signals: (&db, days),
signal_types: &db,
})
}
fn camera(&self, req: &Request<::hyper::Body>, uuid: Uuid) -> ResponseResult {
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| not_found(format!("no such camera {}", uuid)))?;
serve_json(req, &json::Camera::wrap(camera, &db, true, false).map_err(internal_server_err)?)
}
fn stream_recordings(&self, req: &Request<::hyper::Body>, uuid: Uuid, type_: db::StreamType)
-> ResponseResult {
let (r, split) = {
let mut time = recording::Time::min_value() .. recording::Time::max_value();
let mut split = recording::Duration(i64::max_value());
if let Some(q) = req.uri().query() {
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
let (key, value) = (key.borrow(), value.borrow());
match key {
"startTime90k" => {
time.start = recording::Time::parse(value)
.map_err(|_| bad_req("unparseable startTime90k"))?
},
"endTime90k" => {
time.end = recording::Time::parse(value)
.map_err(|_| bad_req("unparseable endTime90k"))?
},
"split90k" => {
split = recording::Duration(i64::from_str(value)
.map_err(|_| bad_req("unparseable split90k"))?)
},
_ => {},
}
};
}
(time, split)
};
let db = self.db.lock();
let mut out = json::ListRecordings {
recordings: Vec::new(),
video_sample_entries: (&db, Vec::new()),
};
let camera = db.get_camera(uuid)
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such camera {}", uuid)))?;
let stream_id = camera.streams[type_.index()]
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such stream {}/{}", uuid, type_)))?;
db.list_aggregated_recordings(stream_id, r, split, &mut |row| {
let end = row.ids.end - 1; // in api, ids are inclusive.
out.recordings.push(json::Recording {
start_id: row.ids.start,
end_id: if end == row.ids.start { None } else { Some(end) },
start_time_90k: row.time.start.0,
end_time_90k: row.time.end.0,
sample_file_bytes: row.sample_file_bytes,
open_id: row.open_id,
first_uncommitted: row.first_uncommitted,
video_samples: row.video_samples,
video_sample_entry_id: row.video_sample_entry_id.to_string(),
growing: row.growing,
});
if !out.video_sample_entries.1.contains(&row.video_sample_entry_id) {
out.video_sample_entries.1.push(row.video_sample_entry_id);
}
Ok(())
}).map_err(internal_server_err)?;
serve_json(req, &out)
}
fn init_segment(&self, sha1: [u8; 20], debug: bool, req: &Request<::hyper::Body>)
-> ResponseResult {
let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment);
let db = self.db.lock();
for ent in db.video_sample_entries_by_id().values() {
if ent.sha1 == sha1 {
builder.append_video_sample_entry(ent.clone());
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())
.map_err(from_base_error)?;
if debug {
return Ok(plain_response(StatusCode::OK, format!("{:#?}", mp4)));
} else {
return Ok(http_serve::serve(mp4, req));
}
}
}
Err(not_found("no such init segment"))
}
fn stream_view_mp4(&self, req: &Request<::hyper::Body>, caller: Caller, uuid: Uuid,
stream_type: db::StreamType, mp4_type: mp4::Type, debug: bool)
-> ResponseResult {
if !caller.permissions.view_video {
return Err(plain_response(StatusCode::UNAUTHORIZED, "view_video required"));
}
let (stream_id, camera_name);
{
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such camera {}", uuid)))?;
camera_name = camera.short_name.clone();
stream_id = camera.streams[stream_type.index()]
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such stream {}/{}", uuid,
stream_type)))?;
};
let mut start_time_for_filename = None;
let mut builder = mp4::FileBuilder::new(mp4_type);
if let Some(q) = req.uri().query() {
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
let (key, value) = (key.borrow(), value.borrow());
match key {
"s" => {
let s = Segments::from_str(value).map_err(
|()| plain_response(StatusCode::BAD_REQUEST,
format!("invalid s parameter: {}", value)))?;
debug!("stream_view_mp4: appending s={:?}", s);
let mut est_segments = (s.ids.end - s.ids.start) as usize;
if let Some(end) = s.end_time {
// There should be roughly ceil((end - start) /
// desired_recording_duration) recordings in the desired timespan if
// there are no gaps or overlap, possibly another for misalignment of
// the requested timespan with the rotate offset and another because
// rotation only happens at key frames.
let ceil_durations = (end - s.start_time +
recording::DESIRED_RECORDING_DURATION - 1) /
recording::DESIRED_RECORDING_DURATION;
est_segments = cmp::min(est_segments, (ceil_durations + 2) as usize);
}
builder.reserve(est_segments);
let db = self.db.lock();
let mut prev = None;
let mut cur_off = 0;
db.list_recordings_by_id(stream_id, s.ids.clone(), &mut |r| {
let recording_id = r.id.recording();
if let Some(o) = s.open_id {
if r.open_id != o {
bail!("recording {} has open id {}, requested {}",
r.id, r.open_id, o);
}
}
// Check for missing recordings.
match prev {
None if recording_id == s.ids.start => {},
None => bail!("no such recording {}/{}", stream_id, s.ids.start),
Some(id) if r.id.recording() != id + 1 => {
bail!("no such recording {}/{}", stream_id, id + 1);
},
_ => {},
};
prev = Some(recording_id);
// Add a segment for the relevant part of the recording, if any.
let end_time = s.end_time.unwrap_or(i64::max_value());
let d = r.duration_90k as i64;
if s.start_time <= cur_off + d && cur_off < end_time {
let start = cmp::max(0, s.start_time - cur_off);
let end = cmp::min(d, end_time - cur_off);
let times = start as i32 .. end as i32;
debug!("...appending recording {} with times {:?} \
(out of dur {})", r.id, times, d);
if start_time_for_filename.is_none() {
start_time_for_filename =
Some(r.start + recording::Duration(start));
}
builder.append(&db, r, start as i32 .. end as i32)?;
} else {
debug!("...skipping recording {} dur {}", r.id, d);
}
cur_off += d;
Ok(())
}).map_err(internal_server_err)?;
// Check for missing recordings.
match prev {
Some(id) if s.ids.end != id + 1 => {
return Err(not_found(format!("no such recording {}/{}",
stream_id, s.ids.end - 1)));
},
None => {
return Err(not_found(format!("no such recording {}/{}",
stream_id, s.ids.start)));
},
_ => {},
};
if let Some(end) = s.end_time {
if end > cur_off {
return Err(plain_response(
StatusCode::BAD_REQUEST,
format!("end time {} is beyond specified recordings",
end)));
}
}
},
"ts" => builder.include_timestamp_subtitle_track(value == "true"),
_ => return Err(bad_req(format!("parameter {} not understood", key))),
}
};
}
if let Some(start) = start_time_for_filename {
let tm = time::at(time::Timespec{sec: start.unix_seconds(), nsec: 0});
let stream_abbrev = if stream_type == db::StreamType::MAIN { "main" } else { "sub" };
let suffix = if mp4_type == mp4::Type::Normal { "mp4" } else { "m4s" };
builder.set_filename(&format!("{}-{}-{}.{}", tm.strftime("%Y%m%d%H%M%S").unwrap(),
camera_name, stream_abbrev, suffix))
.map_err(from_base_error)?;
}
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())
.map_err(from_base_error)?;
if debug {
return Ok(plain_response(StatusCode::OK, format!("{:#?}", mp4)));
}
Ok(http_serve::serve(mp4, req))
}
fn static_file(&self, req: Request)
-> impl Future