preliminary web support for auth (#26)

Some caveats:

  * it doesn't record the peer IP yet, which makes it harder to verify
    sessions are valid. This is a little annoying to do in hyper now
    (see hyperium/hyper#1410). The direct peer might not be what we want
    right now anyway because there's no TLS support yet (see #27).  In
    the meantime, the sane way to expose Moonfire NVR to the Internet is
    via a proxy server, and recording the proxy's IP is not useful.
    Maybe better to interpret a RFC 7239 Forwarded header (and/or
    the older X-Forwarded-{For,Proto} headers).

  * it doesn't ever use Secure (https-only) cookies, for a similar reason.
    It's not safe to use even with a tls proxy until this is fixed.

  * there's no "moonfire-nvr config" support for inspecting/invalidating
    sessions yet.

  * in debug builds, logging in is crazy slow. See libpasta/libpasta#9.

Some notes:

  * I removed the Javascript "no-use-before-defined" lint, as some of
    the functions form a cycle.

  * Fixed #20 along the way. I needed to add support for properly
    returning non-OK HTTP statuses to signal unauthorized and such.

  * I removed the Access-Control-Allow-Origin header support, which was
    at odds with the "SameSite=lax" in the cookie header. The "yarn
    start" method for running a local proxy server accomplishes the same
    thing as the Access-Control-Allow-Origin support in a more secure
    manner.
This commit is contained in:
Scott Lamb
2018-11-25 21:31:50 -08:00
parent 679370c77a
commit 422cd2a75e
21 changed files with 923 additions and 212 deletions

View File

@@ -54,6 +54,14 @@ impl From<&'static [u8]> for Chunk {
fn from(r: &'static [u8]) -> Self { Chunk(ARefs::new(r)) }
}
impl From<&'static str> for Chunk {
fn from(r: &'static str) -> Self { Chunk(ARefs::new(r.as_bytes())) }
}
impl From<String> for Chunk {
fn from(r: String) -> Self { Chunk(ARefs::new(r.into_bytes()).map(|v| &v[..])) }
}
impl From<Vec<u8>> for Chunk {
fn from(r: Vec<u8>) -> Self { Chunk(ARefs::new(r).map(|v| &v[..])) }
}
@@ -81,8 +89,8 @@ impl From<BodyStream> for Body {
fn from(b: BodyStream) -> Self { Body(b) }
}
impl From<&'static [u8]> for Body {
fn from(c: &'static [u8]) -> Self {
impl<C: Into<Chunk>> From<C> for Body {
fn from(c: C) -> Self {
Body(Box::new(stream::once(Ok(c.into()))))
}
}

View File

@@ -66,9 +66,8 @@ Options:
--http-addr=ADDR Set the bind address for the unencrypted HTTP server.
[default: 0.0.0.0:8080]
--read-only Forces read-only mode / disables recording.
--allow-origin=ORIGIN If present, adds a Access-Control-Allow-Origin:
header to HTTP responses. This may be useful for
Javascript development.
--require-auth=BOOL Requires authentication to access the web interface.
[default: true]
"#;
#[derive(Debug, Deserialize)]
@@ -77,7 +76,7 @@ struct Args {
flag_http_addr: String,
flag_ui_dir: String,
flag_read_only: bool,
flag_allow_origin: Option<String>,
flag_require_auth: bool,
}
fn setup_shutdown() -> impl Future<Item = (), Error = ()> + Send {
@@ -180,7 +179,7 @@ pub fn run() -> Result<(), Error> {
let zone = resolve_zone()?;
info!("Resolved timezone: {}", &zone);
let s = web::Service::new(db.clone(), Some(&args.flag_ui_dir), args.flag_allow_origin, zone)?;
let s = web::Service::new(db.clone(), Some(&args.flag_ui_dir), args.flag_require_auth, zone)?;
// Start a streamer for each stream.
let shutdown_streamers = Arc::new(AtomicBool::new(false));

View File

@@ -28,7 +28,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use db;
use db::{self, auth::SessionHash};
use failure::Error;
use serde::ser::{SerializeMap, SerializeSeq, Serializer};
use std::collections::BTreeMap;
@@ -44,6 +44,26 @@ pub struct TopLevel<'a> {
// "days" attribute or not, according to the bool in the tuple.
#[serde(serialize_with = "TopLevel::serialize_cameras")]
pub cameras: (&'a db::LockedDatabase, bool),
pub session: Option<Session>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all="camelCase")]
pub struct Session {
pub username: String,
#[serde(serialize_with = "Session::serialize_csrf")]
pub csrf: SessionHash,
}
impl Session {
fn serialize_csrf<S>(csrf: &SessionHash, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
let mut tmp = [0u8; 32];
csrf.encode_base64(&mut tmp);
serializer.serialize_str(::std::str::from_utf8(&tmp[..]).expect("base64 is UTF-8"))
}
}
/// JSON serialization wrapper for a single camera when processing `/api/` and

View File

@@ -30,6 +30,7 @@
#![cfg_attr(all(feature="nightly", test), feature(test))]
extern crate base64;
extern crate bytes;
extern crate byteorder;
extern crate core;
@@ -46,6 +47,7 @@ extern crate libc;
#[macro_use] extern crate log;
extern crate reffers;
extern crate rusqlite;
extern crate memchr;
extern crate memmap;
extern crate moonfire_base as base;
extern crate moonfire_db as db;
@@ -54,6 +56,7 @@ extern crate mylog;
extern crate openssl;
extern crate parking_lot;
extern crate regex;
extern crate ring;
extern crate serde;
#[macro_use] extern crate serde_derive;
extern crate serde_json;

View File

@@ -30,15 +30,18 @@
extern crate hyper;
use base::clock::Clocks;
use base::strutil;
use body::{Body, BoxedError, wrap_error};
use body::{Body, BoxedError};
use base64;
use bytes::{BufMut, BytesMut};
use core::borrow::Borrow;
use core::str::FromStr;
use db::{self, recording};
use db::{self, auth, recording};
use db::dir::SampleFileDir;
use failure::Error;
use fnv::FnvHashMap;
use futures::future;
use futures::{Future, Stream, future};
use futures_cpupool;
use json;
use http::{self, Request, Response, status::StatusCode};
@@ -64,6 +67,7 @@ lazy_static! {
Regex::new(r"^(\d+)(-\d+)?(@\d+)?(?:\.(\d+)?-(\d+)?)?$").unwrap();
}
#[derive(Debug)]
enum Path {
TopLevel, // "/api/"
InitSegment([u8; 20]), // "/api/init/<sha1>.mp4"
@@ -71,7 +75,9 @@ enum Path {
StreamRecordings(Uuid, db::StreamType), // "/api/cameras/<uuid>/<type>/recordings"
StreamViewMp4(Uuid, db::StreamType), // "/api/cameras/<uuid>/<type>/view.mp4"
StreamViewMp4Segment(Uuid, db::StreamType), // "/api/cameras/<uuid>/<type>/view.m4s"
Static, // "<other path>"
Login, // "/api/login"
Logout, // "/api/logout"
Static, // (anything that doesn't start with "/api/")
NotFound,
}
@@ -83,6 +89,11 @@ fn decode_path(path: &str) -> Path {
if path == "/" {
return Path::TopLevel;
}
if path == "/login" {
return Path::Login;
} else if path == "/logout" {
return Path::Logout;
}
if path.starts_with("/init/") {
if path.len() != 50 || !path.ends_with(".mp4") {
return Path::NotFound;
@@ -131,6 +142,25 @@ fn decode_path(path: &str) -> Path {
}
}
fn plain_response<B: Into<Body>>(status: http::StatusCode, body: B) -> Response<Body> {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"))
.body(body.into()).expect("hardcoded head should be valid")
}
fn not_found<B: Into<Body>>(body: B) -> Response<Body> {
plain_response(StatusCode::NOT_FOUND, body)
}
fn bad_req<B: Into<Body>>(body: B) -> Response<Body> {
plain_response(StatusCode::BAD_REQUEST, body)
}
fn internal_server_err<E: Into<Error>>(err: E) -> Response<Body> {
plain_response(StatusCode::INTERNAL_SERVER_ERROR, err.into().to_string())
}
#[derive(Debug, Eq, PartialEq)]
struct Segments {
ids: Range<i32>,
@@ -190,25 +220,20 @@ struct ServiceInner {
db: Arc<db::Database>,
dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<SampleFileDir>>>,
ui_files: HashMap<String, UiFile>,
allow_origin: Option<HeaderValue>,
pool: futures_cpupool::CpuPool,
time_zone_name: String,
require_auth: bool,
}
impl ServiceInner {
fn not_found(&self) -> Result<Response<Body>, Error> {
let body: Body = (&b"not found"[..]).into();
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.header(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"))
.body(body)?)
}
type ResponseResult = Result<Response<Body>, Response<Body>>;
fn top_level(&self, req: &Request<::hyper::Body>) -> Result<Response<Body>, Error> {
impl ServiceInner {
fn top_level(&self, req: &Request<::hyper::Body>, session: Option<json::Session>)
-> ResponseResult {
let mut days = false;
if let Some(q) = req.uri().query() {
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
let (key, value) : (_, &str) = (key.borrow(), value.borrow());
let (key, value): (_, &str) = (key.borrow(), value.borrow());
match key {
"days" => days = value == "true",
_ => {},
@@ -224,26 +249,30 @@ impl ServiceInner {
serde_json::to_writer(&mut w, &json::TopLevel {
time_zone_name: &self.time_zone_name,
cameras: (&db, days),
})?;
session,
}).map_err(internal_server_err)?;
}
Ok(resp)
}
fn camera(&self, req: &Request<::hyper::Body>, uuid: Uuid) -> Result<Response<Body>, Error> {
fn camera(&self, req: &Request<::hyper::Body>, uuid: Uuid) -> ResponseResult {
let (mut resp, writer) = http_serve::streaming_body(&req).build();
resp.headers_mut().insert(header::CONTENT_TYPE,
HeaderValue::from_static("application/json"));
if let Some(mut w) = writer {
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| format_err!("no such camera {}", uuid))?;
serde_json::to_writer(&mut w, &json::Camera::wrap(camera, &db, true)?)?
.ok_or_else(|| not_found(format!("no such camera {}", uuid)))?;
serde_json::to_writer(
&mut w,
&json::Camera::wrap(camera, &db, true).map_err(internal_server_err)?
).map_err(internal_server_err)?
};
Ok(resp)
}
fn stream_recordings(&self, req: &Request<::hyper::Body>, uuid: Uuid, type_: db::StreamType)
-> Result<Response<Body>, Error> {
-> ResponseResult {
let (r, split) = {
let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
let mut split = recording::Duration(i64::max_value());
@@ -251,9 +280,18 @@ impl ServiceInner {
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
let (key, value) = (key.borrow(), value.borrow());
match key {
"startTime90k" => time.start = recording::Time::parse(value)?,
"endTime90k" => time.end = recording::Time::parse(value)?,
"split90k" => split = recording::Duration(i64::from_str(value)?),
"startTime90k" => {
time.start = recording::Time::parse(value)
.map_err(|_| bad_req("unparseable startTime90k"))?
},
"endTime90k" => {
time.end = recording::Time::parse(value)
.map_err(|_| bad_req("unparseable endTime90k"))?
},
"split90k" => {
split = recording::Duration(i64::from_str(value)
.map_err(|_| bad_req("unparseable split90k"))?)
},
_ => {},
}
};
@@ -264,9 +302,11 @@ impl ServiceInner {
{
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| format_err!("no such camera {}", uuid))?;
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such camera {}", uuid)))?;
let stream_id = camera.streams[type_.index()]
.ok_or_else(|| format_err!("no such stream {}/{}", uuid, type_))?;
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such stream {}/{}", uuid, type_)))?;
db.list_aggregated_recordings(stream_id, r, split, &mut |row| {
let end = row.ids.end - 1; // in api, ids are inclusive.
let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap();
@@ -285,40 +325,43 @@ impl ServiceInner {
growing: row.growing,
});
Ok(())
})?;
}).map_err(internal_server_err)?;
}
let (mut resp, writer) = http_serve::streaming_body(&req).build();
resp.headers_mut().insert(header::CONTENT_TYPE,
HeaderValue::from_static("application/json"));
if let Some(mut w) = writer {
serde_json::to_writer(&mut w, &out)?
serde_json::to_writer(&mut w, &out).map_err(internal_server_err)?
};
Ok(resp)
}
fn init_segment(&self, sha1: [u8; 20], req: &Request<::hyper::Body>)
-> Result<Response<Body>, Error> {
fn init_segment(&self, sha1: [u8; 20], req: &Request<::hyper::Body>) -> ResponseResult {
let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment);
let db = self.db.lock();
for ent in db.video_sample_entries_by_id().values() {
if ent.sha1 == sha1 {
builder.append_video_sample_entry(ent.clone());
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())
.map_err(internal_server_err)?;
return Ok(http_serve::serve(mp4, req));
}
}
self.not_found()
Err(not_found("no such init segment"))
}
fn stream_view_mp4(&self, req: &Request<::hyper::Body>, uuid: Uuid,
stream_type_: db::StreamType, mp4_type_: mp4::Type)
-> Result<Response<Body>, Error> {
-> ResponseResult {
let stream_id = {
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| format_err!("no such camera {}", uuid))?;
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such camera {}", uuid)))?;
camera.streams[stream_type_.index()]
.ok_or_else(|| format_err!("no such stream {}/{}", uuid, stream_type_))?
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such stream {}/{}", uuid,
stream_type_)))?
};
let mut builder = mp4::FileBuilder::new(mp4_type_);
if let Some(q) = req.uri().query() {
@@ -327,7 +370,8 @@ impl ServiceInner {
match key {
"s" => {
let s = Segments::parse(value).map_err(
|_| format_err!("invalid s parameter: {}", value))?;
|()| plain_response(StatusCode::BAD_REQUEST,
format!("invalid s parameter: {}", value)))?;
debug!("stream_view_mp4: appending s={:?}", s);
let mut est_segments = (s.ids.end - s.ids.start) as usize;
if let Some(end) = s.end_time {
@@ -381,52 +425,199 @@ impl ServiceInner {
}
cur_off += d;
Ok(())
})?;
}).map_err(internal_server_err)?;
// Check for missing recordings.
match prev {
Some(id) if s.ids.end != id + 1 => {
bail!("no such recording {}/{}", stream_id, s.ids.end - 1);
return Err(not_found(format!("no such recording {}/{}",
stream_id, s.ids.end - 1)));
},
None => {
bail!("no such recording {}/{}", stream_id, s.ids.start);
return Err(not_found(format!("no such recording {}/{}",
stream_id, s.ids.start)));
},
_ => {},
};
if let Some(end) = s.end_time {
if end > cur_off {
bail!("end time {} is beyond specified recordings", end);
return Err(plain_response(
StatusCode::BAD_REQUEST,
format!("end time {} is beyond specified recordings",
end)));
}
}
},
"ts" => builder.include_timestamp_subtitle_track(value == "true"),
_ => bail!("parameter {} not understood", key),
_ => return Err(bad_req(format!("parameter {} not understood", key))),
}
};
}
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())
.map_err(internal_server_err)?;
Ok(http_serve::serve(mp4, req))
}
fn static_file(&self, req: &Request<::hyper::Body>) -> Result<Response<Body>, Error> {
let s = match self.ui_files.get(req.uri().path()) {
None => { return self.not_found() },
Some(s) => s,
};
let f = fs::File::open(&s.path)?;
fn static_file(&self, req: &Request<::hyper::Body>, path: &str) -> ResponseResult {
let s = self.ui_files.get(path).ok_or_else(|| not_found("no such static file"))?;
let f = fs::File::open(&s.path).map_err(internal_server_err)?;
let mut hdrs = http::HeaderMap::new();
hdrs.insert(header::CONTENT_TYPE, s.mime.clone());
let e = http_serve::ChunkedReadFile::new(f, Some(self.pool.clone()), hdrs)?;
let e = http_serve::ChunkedReadFile::new(f, Some(self.pool.clone()), hdrs)
.map_err(internal_server_err)?;
Ok(http_serve::serve(e, &req))
}
fn authreq(&self, req: &Request<::hyper::Body>) -> auth::Request {
auth::Request {
when_sec: Some(self.db.clocks().realtime().sec),
addr: None, // TODO: req.remote_addr().map(|a| a.ip()),
user_agent: req.headers().get(header::USER_AGENT).map(|ua| ua.as_bytes().to_vec()),
}
}
fn login(&self, req: &Request<::hyper::Body>, body: hyper::Chunk) -> ResponseResult {
let mut username = None;
let mut password = None;
for (key, value) in form_urlencoded::parse(&body) {
match &*key {
"username" => username = Some(value),
"password" => password = Some(value),
_ => {},
};
}
let (username, password) = match (username, password) {
(Some(u), Some(p)) => (u, p),
_ => return Err(bad_req("expected username + password")),
};
let authreq = self.authreq(req);
let host = req.headers().get(header::HOST).ok_or_else(|| bad_req("missing Host header!"))?;
let host = host.as_bytes();
let domain = match ::memchr::memchr(b':', host) {
Some(colon) => &host[0..colon],
None => host,
}.to_owned();
let mut l = self.db.lock();
let flags = (auth::SessionFlags::HttpOnly as i32) | (auth::SessionFlags::SameSite as i32);
let (sid, _) = l.login_by_password(authreq, &username, password.into_owned(), domain,
flags)
.map_err(|e| plain_response(StatusCode::UNAUTHORIZED, e.to_string()))?;
let s_suffix = "; HttpOnly; SameSite=Lax; Max-Age=2147483648; Path=/";
let mut encoded = [0u8; 64];
base64::encode_config_slice(&sid, base64::STANDARD_NO_PAD, &mut encoded);
let mut cookie = BytesMut::with_capacity("s=".len() + encoded.len() + s_suffix.len());
cookie.put("s=");
cookie.put(&encoded[..]);
cookie.put(s_suffix);
Ok(Response::builder()
.header(header::SET_COOKIE, cookie.freeze())
.status(StatusCode::NO_CONTENT)
.body(b""[..].into()).unwrap())
}
fn logout(&self, req: &Request<hyper::Body>, body: hyper::Chunk) -> ResponseResult {
// Parse parameters.
let mut csrf = None;
for (key, value) in form_urlencoded::parse(&body) {
match &*key {
"csrf" => csrf = Some(value),
_ => {},
};
}
let mut res = Response::new(b""[..].into());
if let Some(sid) = extract_sid(req) {
let authreq = self.authreq(req);
let mut l = self.db.lock();
let hash = sid.hash();
let need_revoke = match l.authenticate_session(authreq.clone(), &hash) {
Ok((s, _)) => {
let correct_csrf = if let Some(c) = csrf {
csrf_matches(&*c, s.csrf())
} else { false };
if !correct_csrf {
warn!("logout request with missing/incorrect csrf");
return Err(bad_req("logout with incorrect csrf token"));
}
info!("revoking session");
true
},
Err(e) => {
// TODO: distinguish "no such session", "session is no longer valid", and
// "user ... is disabled" (which are all client error / bad state) from database
// errors.
warn!("logout failed: {}", e);
false
},
};
if need_revoke {
// TODO: inline this above with non-lexical lifetimes.
l.revoke_session(auth::RevocationReason::LoggedOut, None, authreq, &hash)
.map_err(internal_server_err)?;
}
// By now the session is invalid (whether it was valid to start with or not).
// Clear useless cookie.
res.headers_mut().append(header::SET_COOKIE,
HeaderValue::from_str("s=; Max-Age=0; Path=/").unwrap());
}
*res.status_mut() = StatusCode::NO_CONTENT;
Ok(res)
}
fn authenticated(&self, req: &Request<hyper::Body>) -> Result<Option<json::Session>, Error> {
if let Some(sid) = extract_sid(req) {
let authreq = self.authreq(req);
match self.db.lock().authenticate_session(authreq.clone(), &sid.hash()) {
Ok((s, u)) => {
return Ok(Some(json::Session {
username: u.username.clone(),
csrf: s.csrf(),
}))
},
Err(_) => {
// TODO: real error handling! this assumes all errors are due to lack of
// authentication, when they could be logic errors in SQL or such.
return Ok(None);
}
}
}
Ok(None)
}
}
fn csrf_matches(csrf: &str, session: auth::SessionHash) -> bool {
let mut b64 = [0u8; 32];
session.encode_base64(&mut b64);
::ring::constant_time::verify_slices_are_equal(&b64[..], csrf.as_bytes()).is_ok()
}
/// Extracts `s` cookie from the HTTP request. Does not authenticate.
fn extract_sid(req: &Request<hyper::Body>) -> Option<auth::RawSessionId> {
let hdr = match req.headers().get(header::COOKIE) {
None => return None,
Some(c) => c,
};
for mut cookie in hdr.as_bytes().split(|&b| b == b';') {
if cookie.starts_with(b" ") {
cookie = &cookie[1..];
}
if cookie.starts_with(b"s=") {
let s = &cookie[2..];
if let Ok(s) = auth::RawSessionId::decode_base64(s) {
return Some(s);
}
}
}
None
}
#[derive(Clone)]
pub struct Service(Arc<ServiceInner>);
impl Service {
pub fn new(db: Arc<db::Database>, ui_dir: Option<&str>, allow_origin: Option<String>,
zone: String) -> Result<Self, Error> {
pub fn new(db: Arc<db::Database>, ui_dir: Option<&str>, require_auth: bool,
time_zone_name: String) -> Result<Self, Error> {
let mut ui_files = HashMap::new();
if let Some(d) = ui_dir {
Service::fill_ui_files(d, &mut ui_files);
@@ -448,17 +639,14 @@ impl Service {
}
Arc::new(d)
};
let allow_origin = match allow_origin {
None => None,
Some(o) => Some(HeaderValue::from_str(&o)?),
};
Ok(Service(Arc::new(ServiceInner {
db,
dirs_by_stream_id,
ui_files,
allow_origin,
pool: futures_cpupool::Builder::new().pool_size(1).name_prefix("static").create(),
time_zone_name: zone,
require_auth,
time_zone_name,
})))
}
@@ -502,44 +690,186 @@ impl Service {
});
}
}
/// Returns a future separating the request from its form body.
///
/// If this is not a `POST` or the body's `Content-Type` is not
/// `application/x-www-form-urlencoded`, returns an appropriate error response instead.
///
/// Use with `and_then` to chain logic which consumes the form body.
fn with_form_body(&self, mut req: Request<hyper::Body>)
-> Box<Future<Item = (Request<hyper::Body>, hyper::Chunk),
Error = Response<Body>> +
Send + 'static> {
if *req.method() != http::method::Method::POST {
return Box::new(future::err(plain_response(StatusCode::METHOD_NOT_ALLOWED,
"POST expected")));
}
let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) {
Some(t) if t == "application/x-www-form-urlencoded" => true,
Some(t) if t == "application/x-www-form-urlencoded; charset=UTF-8" => true,
_ => false,
};
if !correct_mime_type {
return Box::new(future::err(bad_req(
"expected application/x-www-form-urlencoded request body")));
}
let b = ::std::mem::replace(req.body_mut(), hyper::Body::empty());
Box::new(b.concat2()
.map(|b| (req, b))
.map_err(|e| internal_server_err(format_err!("unable to read request body: {}",
e))))
}
}
impl ::hyper::service::Service for Service {
type ReqBody = ::hyper::Body;
type ResBody = Body;
type Error = BoxedError;
type Future = future::FutureResult<Response<Self::ResBody>, Self::Error>;
type Future = Box<Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>;
fn call(&mut self, req: Request<::hyper::Body>) -> Self::Future {
debug!("request on: {}", req.uri());
let mut res = match decode_path(req.uri().path()) {
Path::InitSegment(sha1) => self.0.init_segment(sha1, &req),
Path::TopLevel => self.0.top_level(&req),
Path::Camera(uuid) => self.0.camera(&req, uuid),
Path::StreamRecordings(uuid, type_) => self.0.stream_recordings(&req, uuid, type_),
fn wrap<R: Future<Item = Response<Body>, Error = Response<Body>> + Send + 'static>(r: R)
-> Box<Future<Item = Response<Body>, Error = BoxedError> + Send + 'static> {
return Box::new(r.or_else(|e| Ok(e)))
}
fn wrap_r(r: ResponseResult)
-> Box<Future<Item = Response<Body>, Error = BoxedError> + Send + 'static> {
return wrap(future::result(r))
}
let p = decode_path(req.uri().path());
let require_auth = self.0.require_auth && match p {
Path::NotFound | Path::Login | Path::Logout | Path::Static => false,
_ => true,
};
debug!("request on: {}: {:?}, require_auth={}", req.uri(), p, require_auth);
let session = match self.0.authenticated(&req) {
Ok(s) => s,
Err(e) => return Box::new(future::ok(internal_server_err(e))),
};
if require_auth && session.is_none() {
return Box::new(future::ok(
plain_response(StatusCode::UNAUTHORIZED, "unauthorized")));
}
match decode_path(req.uri().path()) {
Path::InitSegment(sha1) => wrap_r(self.0.init_segment(sha1, &req)),
Path::TopLevel => wrap_r(self.0.top_level(&req, session)),
Path::Camera(uuid) => wrap_r(self.0.camera(&req, uuid)),
Path::StreamRecordings(uuid, type_) => {
wrap_r(self.0.stream_recordings(&req, uuid, type_))
},
Path::StreamViewMp4(uuid, type_) => {
self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::Normal)
wrap_r(self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::Normal))
},
Path::StreamViewMp4Segment(uuid, type_) => {
self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::MediaSegment)
wrap_r(self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::MediaSegment))
},
Path::NotFound => self.0.not_found(),
Path::Static => self.0.static_file(&req),
};
if let Ok(ref mut resp) = res {
if let Some(ref o) = self.0.allow_origin {
resp.headers_mut().insert(header::ACCESS_CONTROL_ALLOW_ORIGIN, o.clone());
}
Path::NotFound => wrap(future::err(not_found("path not understood"))),
Path::Login => wrap(self.with_form_body(req).and_then({
let s = self.clone();
move |(req, b)| { s.0.login(&req, b) }
})),
Path::Logout => wrap(self.with_form_body(req).and_then({
let s = self.clone();
move |(req, b)| { s.0.logout(&req, b) }
})),
Path::Static => wrap_r(self.0.static_file(&req, req.uri().path())),
}
future::result(res.map_err(|e| wrap_error(e)))
}
}
#[cfg(test)]
mod tests {
use db::testutil;
extern crate reqwest;
use db;
use db::testutil::{self, TestDb};
use futures::Future;
use http::{self, header};
use std::collections::HashMap;
use std::error::Error as StdError;
use super::Segments;
struct Server {
db: TestDb<::base::clock::RealClocks>,
base_url: String,
//test_camera_uuid: Uuid,
handle: Option<::std::thread::JoinHandle<()>>,
shutdown_tx: Option<futures::sync::oneshot::Sender<()>>,
}
impl Server {
fn new() -> Server {
let db = TestDb::new(::base::clock::RealClocks {});
let (shutdown_tx, shutdown_rx) = futures::sync::oneshot::channel::<()>();
let addr = "127.0.0.1:0".parse().unwrap();
let require_auth = true;
let service = super::Service::new(db.db.clone(), None, require_auth,
"".to_owned()).unwrap();
let server = hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(move || Ok::<_, Box<StdError + Send + Sync>>(service.clone()));
let addr = server.local_addr(); // resolve port 0 to a real ephemeral port number.
let handle = ::std::thread::spawn(move || {
::tokio::run(server.with_graceful_shutdown(shutdown_rx).map_err(|e| panic!(e)));
});
// Create a user.
let mut c = db::UserChange::add_user("slamb".to_owned());
c.set_password("hunter2".to_owned());
db.db.lock().apply_user_change(c).unwrap();
Server {
db,
base_url: format!("http://{}:{}", addr.ip(), addr.port()),
handle: Some(handle),
shutdown_tx: Some(shutdown_tx),
}
}
}
impl Drop for Server {
fn drop(&mut self) {
self.shutdown_tx.take().unwrap().send(()).unwrap();
self.handle.take().unwrap().join().unwrap()
}
}
#[derive(Clone, Debug, Default)]
struct SessionCookie(Option<String>);
impl SessionCookie {
pub fn new(headers: &http::HeaderMap) -> Self {
let mut c = SessionCookie::default();
c.update(headers);
c
}
pub fn update(&mut self, headers: &http::HeaderMap) {
for set_cookie in headers.get_all(header::SET_COOKIE) {
let mut set_cookie = set_cookie.to_str().unwrap().split("; ");
let c = set_cookie.next().unwrap();
let mut clear = false;
for attr in set_cookie {
if attr == "Max-Age=0" {
clear = true;
}
}
if !c.starts_with("s=") {
panic!("unrecognized cookie");
}
self.0 = if clear { None } else { Some(c.to_owned()) };
}
}
/// Produces a `Cookie` header value.
pub fn header(&self) -> String {
self.0.as_ref().map(|s| s.as_str()).unwrap_or("").to_owned()
}
}
#[test]
fn test_segments() {
testutil::init();
@@ -564,6 +894,103 @@ mod tests {
assert_eq!(Segments{ids: 1..6, open_id: None, start_time: 26, end_time: Some(42)},
Segments::parse("1-5.26-42").unwrap());
}
#[test]
fn unauthorized_without_cookie() {
testutil::init();
let s = Server::new();
let cli = reqwest::Client::new();
let resp = cli.get(&format!("{}/api/", &s.base_url)).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::UNAUTHORIZED);
}
#[test]
fn login() {
testutil::init();
let s = Server::new();
let cli = reqwest::Client::new();
let login_url = format!("{}/api/login", &s.base_url);
let resp = cli.get(&login_url).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::METHOD_NOT_ALLOWED);
let resp = cli.post(&login_url).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::BAD_REQUEST);
let mut p = HashMap::new();
p.insert("username", "slamb");
p.insert("password", "asdf");
let resp = cli.post(&login_url).form(&p).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::UNAUTHORIZED);
p.insert("password", "hunter2");
let resp = cli.post(&login_url).form(&p).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::NO_CONTENT);
let cookie = SessionCookie::new(resp.headers());
info!("cookie: {:?}", cookie);
info!("header: {}", cookie.header());
let resp = cli.get(&format!("{}/api/", &s.base_url))
.header(header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
}
#[test]
fn logout() {
testutil::init();
let s = Server::new();
let cli = reqwest::Client::new();
let mut p = HashMap::new();
p.insert("username", "slamb");
p.insert("password", "hunter2");
let resp = cli.post(&format!("{}/api/login", &s.base_url)).form(&p).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::NO_CONTENT);
let cookie = SessionCookie::new(resp.headers());
// A GET shouldn't work.
let resp = cli.get(&format!("{}/api/logout", &s.base_url))
.header(header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::METHOD_NOT_ALLOWED);
// Neither should a POST without a csrf token.
let resp = cli.post(&format!("{}/api/logout", &s.base_url))
.header(header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::BAD_REQUEST);
// But it should work with the csrf token.
// Retrieve that from the toplevel API request.
let toplevel: serde_json::Value = cli.post(&format!("{}/api/", &s.base_url))
.header(header::COOKIE, cookie.header())
.send().unwrap()
.json().unwrap();
let csrf = toplevel.get("session").unwrap().get("csrf").unwrap().as_str();
let mut p = HashMap::new();
p.insert("csrf", csrf);
let resp = cli.post(&format!("{}/api/logout", &s.base_url))
.header(header::COOKIE, cookie.header())
.form(&p)
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::NO_CONTENT);
let mut updated_cookie = cookie.clone();
updated_cookie.update(resp.headers());
// The cookie should be cleared client-side.
assert!(updated_cookie.0.is_none());
// It should also be invalidated server-side.
let resp = cli.get(&format!("{}/api/", &s.base_url))
.header(header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::UNAUTHORIZED);
}
}
#[cfg(all(test, feature="nightly"))]
@@ -588,18 +1015,17 @@ mod bench {
let db = TestDb::new(::base::clock::RealClocks {});
let test_camera_uuid = db.test_camera_uuid;
testutil::add_dummy_recordings_to_db(&db.db, 1440);
let (tx, rx) = ::std::sync::mpsc::channel();
let addr = "127.0.0.1:0".parse().unwrap();
let require_auth = false;
let service = super::Service::new(db.db.clone(), None, require_auth,
"".to_owned()).unwrap();
let server = hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(move || Ok::<_, Box<StdError + Send + Sync>>(service.clone()));
let addr = server.local_addr(); // resolve port 0 to a real ephemeral port number.
::std::thread::spawn(move || {
let addr = "127.0.0.1:0".parse().unwrap();
let service = super::Service::new(db.db.clone(), None, None,
"".to_owned()).unwrap();
let server = hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(move || Ok::<_, Box<StdError + Send + Sync>>(service.clone()));
tx.send(server.local_addr()).unwrap();
::tokio::run(server.map_err(|e| panic!(e)));
});
let addr = rx.recv().unwrap();
Server {
base_url: format!("http://{}:{}", addr.ip(), addr.port()),
test_camera_uuid,