first step toward object detection (#30)

When compiled with cargo build --features=analytics and enabled via
moonfire-nvr run --object-detection, this runs object detection on every
sub stream frame through an Edge TPU (a Coral USB accelerator) and logs
the result.

This is a very small step toward a working system. It doesn't actually
record the result in the database or send it out on the live stream yet.
It doesn't support running object detection at a lower frame rate than
the sub streams come in at either. To address those problems, I need to
do some refactoring. Currently moonfire_db::writer::Writer::Write is the
only place that knows the duration of the frame it's about to flush,
before it gets added to the index or sent out on the live stream. I
don't want to do the detection from there; I'd prefer the moonfire_nvr
crate. So I either need to introduce an analytics callback or move a
bunch of that logic to the other crate.

Once I do that, I need to add database support (although I have some
experiments for that in moonfire-playground) and API support, then some
kind of useful frontend.

Note edgetpu.tflite is taken from the Apache 2.0-licensed
https://github.com/google-coral/edgetpu,
test_data/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite. The
following page says it's fine to include Apache 2.0 stuff in GPLv3
projects:
https://www.apache.org/licenses/GPL-compatibility.html
This commit is contained in:
Scott Lamb 2020-04-13 23:03:49 -07:00
parent ad13935ed6
commit 3ed397bacd
8 changed files with 342 additions and 9 deletions

13
Cargo.lock generated
View File

@ -1300,7 +1300,7 @@ dependencies = [
[[package]] [[package]]
name = "moonfire-ffmpeg" name = "moonfire-ffmpeg"
version = "0.0.1" version = "0.0.1"
source = "git+https://github.com/scottlamb/moonfire-ffmpeg#c517aa782867c8b882524a2d21361753ca845f06" source = "git+https://github.com/scottlamb/moonfire-ffmpeg#8082cc870334a6ef01420bf87b1d1ec6d14a9878"
dependencies = [ dependencies = [
"cc", "cc",
"libc", "libc",
@ -1335,6 +1335,7 @@ dependencies = [
"moonfire-base", "moonfire-base",
"moonfire-db", "moonfire-db",
"moonfire-ffmpeg", "moonfire-ffmpeg",
"moonfire-tflite",
"mylog", "mylog",
"nix", "nix",
"parking_lot", "parking_lot",
@ -1355,6 +1356,16 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "moonfire-tflite"
version = "0.0.1"
source = "git+https://github.com/scottlamb/moonfire-tflite#b86940ef5b3a5b2583e52c1d494477cae7dac7c8"
dependencies = [
"cc",
"libc",
"log",
]
[[package]] [[package]]
name = "mylog" name = "mylog"
version = "0.1.0" version = "0.1.0"

View File

@ -14,6 +14,8 @@ nightly = ["db/nightly", "parking_lot/nightly", "smallvec/union"]
# native libraries where possible. # native libraries where possible.
bundled = ["rusqlite/bundled"] bundled = ["rusqlite/bundled"]
analytics = ["moonfire-tflite", "ffmpeg/swscale"]
[workspace] [workspace]
members = ["base", "db"] members = ["base", "db"]
@ -51,6 +53,7 @@ rusqlite = "0.21.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
smallvec = "1.0" smallvec = "1.0"
moonfire-tflite = { git = "https://github.com/scottlamb/moonfire-tflite", features = ["edgetpu"], optional = true }
time = "0.1" time = "0.1"
tokio = { version = "0.2.0", features = ["blocking", "macros", "rt-threaded", "signal"] } tokio = { version = "0.2.0", features = ["blocking", "macros", "rt-threaded", "signal"] }
tokio-tungstenite = "0.10.1" tokio-tungstenite = "0.10.1"

254
src/analytics.rs Normal file
View File

@ -0,0 +1,254 @@
// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2020 The Moonfire NVR Authors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//! Video analytics via TensorFlow Lite and an Edge TPU.
//!
//! Note this module is only compiled with `--features=analytics`. There's a stub implementation in
//! `src/main.rs` which is used otherwise.
//!
//! Currently results are only logged (rather spammily, on each frame), not persisted to the
//! database. This will change soon.
//!
//! Currently does object detection on every frame with a single hardcoded model: the 300x300
//! MobileNet SSD v2 (COCO) from https://coral.ai/models/. Eventually analytics might include:
//!
//! * an object detection model retrained on surveillance images and/or larger input sizes
//! for increased accuracy.
//! * multiple invocations per image to improve resolution with current model sizes
//! (either fixed, overlapping subsets of the image or zooming in on full-frame detections to
//! increase confidence).
//! * support for other hardware setups (GPUs, other brands of NPUs).
//! * a motion detection model.
//! * H.264/H.265 decoding on every frame but performing object detection at a minimum pts
//! interval to cut down on expense.
use cstr::*;
use failure::{Error, format_err};
use ffmpeg;
use log::info;
use std::sync::Arc;
static MODEL: &[u8] = include_bytes!("edgetpu.tflite");
//static MODEL_UUID: Uuid = Uuid::from_u128(0x02054a38_62cf_42ff_9ffa_04876a2970d0_u128);
pub static MODEL_LABELS: [Option<&str>; 90] = [
Some("person"),
Some("bicycle"),
Some("car"),
Some("motorcycle"),
Some("airplane"),
Some("bus"),
Some("train"),
Some("truck"),
Some("boat"),
Some("traffic light"),
Some("fire hydrant"),
None,
Some("stop sign"),
Some("parking meter"),
Some("bench"),
Some("bird"),
Some("cat"),
Some("dog"),
Some("horse"),
Some("sheep"),
Some("cow"),
Some("elephant"),
Some("bear"),
Some("zebra"),
Some("giraffe"),
None,
Some("backpack"),
Some("umbrella"),
None,
None,
Some("handbag"),
Some("tie"),
Some("suitcase"),
Some("frisbee"),
Some("skis"),
Some("snowboard"),
Some("sports ball"),
Some("kite"),
Some("baseball bat"),
Some("baseball glove"),
Some("skateboard"),
Some("surfboard"),
Some("tennis racket"),
Some("bottle"),
None,
Some("wine glass"),
Some("cup"),
Some("fork"),
Some("knife"),
Some("spoon"),
Some("bowl"),
Some("banana"),
Some("apple"),
Some("sandwich"),
Some("orange"),
Some("broccoli"),
Some("carrot"),
Some("hot dog"),
Some("pizza"),
Some("donut"),
Some("cake"),
Some("chair"),
Some("couch"),
Some("potted plant"),
Some("bed"),
None,
Some("dining table"),
None,
None,
Some("toilet"),
None,
Some("tv"),
Some("laptop"),
Some("mouse"),
Some("remote"),
Some("keyboard"),
Some("cell phone"),
Some("microwave"),
Some("oven"),
Some("toaster"),
Some("sink"),
Some("refrigerator"),
None,
Some("book"),
Some("clock"),
Some("vase"),
Some("scissors"),
Some("teddy bear"),
Some("hair drier"),
Some("toothbrush"),
];
pub struct ObjectDetector {
interpreter: parking_lot::Mutex<moonfire_tflite::Interpreter<'static>>,
width: i32,
height: i32,
}
impl ObjectDetector {
pub fn new(/*db: &db::LockedDatabase*/) -> Result<Arc<Self>, Error> {
let model = moonfire_tflite::Model::from_static(MODEL)
.map_err(|()| format_err!("TensorFlow Lite model initialization failed"))?;
let devices = moonfire_tflite::edgetpu::Devices::list();
let device = devices.first().ok_or_else(|| format_err!("No Edge TPU device available"))?;
info!("Using device {:?}/{:?} for object detection", device.type_(), device.path());
let mut builder = moonfire_tflite::Interpreter::builder();
builder.add_owned_delegate(device.create_delegate()
.map_err(|()| format_err!("Unable to create delegate for {:?}/{:?}",
device.type_(), device.path()))?);
let interpreter = builder.build(&model)
.map_err(|()| format_err!("TensorFlow Lite initialization failed"))?;
Ok(Arc::new(Self {
interpreter: parking_lot::Mutex::new(interpreter),
width: 300, // TODO
height: 300,
}))
}
}
pub struct ObjectDetectorStream {
decoder: ffmpeg::avcodec::DecodeContext,
frame: ffmpeg::avutil::VideoFrame,
scaler: ffmpeg::swscale::Scaler,
scaled: ffmpeg::avutil::VideoFrame,
}
/// Copies from a RGB24 VideoFrame to a 1xHxWx3 Tensor.
fn copy(from: &ffmpeg::avutil::VideoFrame, to: &mut moonfire_tflite::Tensor) {
let from = from.plane(0);
let to = to.bytes_mut();
let (w, h) = (from.width, from.height);
let mut from_i = 0;
let mut to_i = 0;
for _y in 0..h {
to[to_i..to_i+3*w].copy_from_slice(&from.data[from_i..from_i+3*w]);
from_i += from.linesize;
to_i += 3*w;
}
}
const SCORE_THRESHOLD: f32 = 0.5;
impl ObjectDetectorStream {
pub fn new(par: ffmpeg::avcodec::InputCodecParameters<'_>,
detector: &ObjectDetector) -> Result<Self, Error> {
let mut dopt = ffmpeg::avutil::Dictionary::new();
dopt.set(cstr!("refcounted_frames"), cstr!("0"))?;
let decoder = par.new_decoder(&mut dopt)?;
let scaled = ffmpeg::avutil::VideoFrame::owned(ffmpeg::avutil::ImageDimensions {
width: detector.width,
height: detector.height,
pix_fmt: ffmpeg::avutil::PixelFormat::rgb24(),
})?;
let frame = ffmpeg::avutil::VideoFrame::empty()?;
let scaler = ffmpeg::swscale::Scaler::new(par.dims(), scaled.dims())?;
Ok(Self {
decoder,
frame,
scaler,
scaled,
})
}
pub fn process_frame(&mut self, pkt: &ffmpeg::avcodec::Packet<'_>,
detector: &ObjectDetector) -> Result<(), Error> {
if !self.decoder.decode_video(pkt, &mut self.frame)? {
return Ok(());
}
self.scaler.scale(&self.frame, &mut self.scaled);
let mut interpreter = detector.interpreter.lock();
copy(&self.scaled, &mut interpreter.inputs()[0]);
interpreter.invoke().map_err(|()| format_err!("TFLite interpreter invocation failed"))?;
let outputs = interpreter.outputs();
let classes = outputs[1].f32s();
let scores = outputs[2].f32s();
for (i, &score) in scores.iter().enumerate() {
if score < SCORE_THRESHOLD {
continue;
}
let class = classes[i] as usize;
if class >= MODEL_LABELS.len() {
continue;
}
let label = match MODEL_LABELS[class] {
None => continue,
Some(l) => l,
};
info!("{}, score {}", label, score);
}
Ok(())
}
}

View File

@ -81,6 +81,8 @@ Options:
your proxy server is configured to set them and that your proxy server is configured to set them and that
no untrusted requests bypass the proxy server. no untrusted requests bypass the proxy server.
You may want to specify --http-addr=127.0.0.1:8080. You may want to specify --http-addr=127.0.0.1:8080.
--object-detection Perform object detection on SUB streams.
Note: requires compilation with --feature=analytics.
"#; "#;
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -91,6 +93,7 @@ struct Args {
flag_read_only: bool, flag_read_only: bool,
flag_allow_unauthenticated_permissions: Option<String>, flag_allow_unauthenticated_permissions: Option<String>,
flag_trust_forward_hdrs: bool, flag_trust_forward_hdrs: bool,
flag_object_detection: bool,
} }
fn trim_zoneinfo(p: &str) -> &str { fn trim_zoneinfo(p: &str) -> &str {
@ -176,6 +179,11 @@ pub async fn run() -> Result<(), Error> {
let db = Arc::new(db::Database::new(clocks.clone(), conn, !args.flag_read_only).unwrap()); let db = Arc::new(db::Database::new(clocks.clone(), conn, !args.flag_read_only).unwrap());
info!("Database is loaded."); info!("Database is loaded.");
let object_detector = match args.flag_object_detection {
false => None,
true => Some(crate::analytics::ObjectDetector::new()?),
};
{ {
let mut l = db.lock(); let mut l = db.lock();
let dirs_to_open: Vec<_> = let dirs_to_open: Vec<_> =
@ -252,10 +260,15 @@ pub async fn run() -> Result<(), Error> {
}; };
let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64; let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64;
let syncer = syncers.get(&sample_file_dir_id).unwrap(); let syncer = syncers.get(&sample_file_dir_id).unwrap();
let object_detector = match stream.type_ {
db::StreamType::SUB => object_detector.as_ref().map(|a| Arc::clone(a)),
_ => None,
};
let mut streamer = streamer::Streamer::new(&env, syncer.dir.clone(), let mut streamer = streamer::Streamer::new(&env, syncer.dir.clone(),
syncer.channel.clone(), *id, camera, stream, syncer.channel.clone(), *id, camera, stream,
rotate_offset_sec, rotate_offset_sec,
streamer::ROTATE_INTERVAL_SEC)?; streamer::ROTATE_INTERVAL_SEC,
object_detector)?;
info!("Starting streamer for {}", streamer.short_name()); info!("Starting streamer for {}", streamer.short_name());
let name = format!("s-{}", streamer.short_name()); let name = format!("s-{}", streamer.short_name());
streamers.push(thread::Builder::new().name(name).spawn(move|| { streamers.push(thread::Builder::new().name(name).spawn(move|| {

BIN
src/edgetpu.tflite Normal file

Binary file not shown.

View File

@ -33,6 +33,37 @@
use log::{error, info}; use log::{error, info};
use serde::Deserialize; use serde::Deserialize;
#[cfg(feature = "analytics")]
mod analytics;
/// Stub implementation of analytics module when not compiled with TensorFlow Lite.
#[cfg(not(feature = "analytics"))]
mod analytics {
use failure::{Error, bail};
pub struct ObjectDetector;
impl ObjectDetector {
pub fn new() -> Result<std::sync::Arc<ObjectDetector>, Error> {
bail!("Recompile with --features=analytics for object detection.");
}
}
pub struct ObjectDetectorStream;
impl ObjectDetectorStream {
pub fn new(_par: ffmpeg::avcodec::InputCodecParameters<'_>,
_detector: &ObjectDetector) -> Result<Self, Error> {
unimplemented!();
}
pub fn process_frame(&mut self, _pkt: &ffmpeg::avcodec::Packet<'_>,
_detector: &ObjectDetector) -> Result<(), Error> {
unimplemented!();
}
}
}
mod body; mod body;
mod cmds; mod cmds;
mod h264; mod h264;

View File

@ -61,6 +61,7 @@ pub trait Opener<S : Stream> : Sync {
} }
pub trait Stream { pub trait Stream {
fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters<'_>;
fn get_extra_data(&self) -> Result<h264::ExtraData, Error>; fn get_extra_data(&self) -> Result<h264::ExtraData, Error>;
fn get_next<'p>(&'p mut self) -> Result<ffmpeg::avcodec::Packet<'p>, ffmpeg::Error>; fn get_next<'p>(&'p mut self) -> Result<ffmpeg::avcodec::Packet<'p>, ffmpeg::Error>;
} }
@ -147,11 +148,15 @@ impl Opener<FfmpegStream> for Ffmpeg {
} }
pub struct FfmpegStream { pub struct FfmpegStream {
input: ffmpeg::avformat::InputFormatContext, input: ffmpeg::avformat::InputFormatContext<'static>,
video_i: usize, video_i: usize,
} }
impl Stream for FfmpegStream { impl Stream for FfmpegStream {
fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters {
self.input.streams().get(self.video_i).codecpar()
}
fn get_extra_data(&self) -> Result<h264::ExtraData, Error> { fn get_extra_data(&self) -> Result<h264::ExtraData, Error> {
let video = self.input.streams().get(self.video_i); let video = self.input.streams().get(self.video_i);
let tb = video.time_base(); let tb = video.time_base();

View File

@ -63,13 +63,16 @@ pub struct Streamer<'a, C, S> where C: Clocks + Clone, S: 'a + stream::Stream {
short_name: String, short_name: String,
url: Url, url: Url,
redacted_url: Url, redacted_url: Url,
detector: Option<Arc<crate::analytics::ObjectDetector>>,
} }
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::Stream { impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::Stream {
pub fn new<'b>(env: &Environment<'a, 'b, C, S>, dir: Arc<dir::SampleFileDir>, pub fn new<'b>(env: &Environment<'a, 'b, C, S>, dir: Arc<dir::SampleFileDir>,
syncer_channel: writer::SyncerChannel<::std::fs::File>, syncer_channel: writer::SyncerChannel<::std::fs::File>,
stream_id: i32, c: &Camera, s: &Stream, rotate_offset_sec: i64, stream_id: i32, c: &Camera, s: &Stream, rotate_offset_sec: i64,
rotate_interval_sec: i64) -> Result<Self, Error> { rotate_interval_sec: i64,
detector: Option<Arc<crate::analytics::ObjectDetector>>)
-> Result<Self, Error> {
let mut url = Url::parse(&s.rtsp_url)?; let mut url = Url::parse(&s.rtsp_url)?;
let mut redacted_url = url.clone(); let mut redacted_url = url.clone();
if !c.username.is_empty() { if !c.username.is_empty() {
@ -80,16 +83,17 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::
} }
Ok(Streamer { Ok(Streamer {
shutdown: env.shutdown.clone(), shutdown: env.shutdown.clone(),
rotate_offset_sec: rotate_offset_sec, rotate_offset_sec,
rotate_interval_sec: rotate_interval_sec, rotate_interval_sec,
db: env.db.clone(), db: env.db.clone(),
dir, dir,
syncer_channel: syncer_channel, syncer_channel,
opener: env.opener, opener: env.opener,
stream_id: stream_id, stream_id,
short_name: format!("{}-{}", c.short_name, s.type_.as_str()), short_name: format!("{}-{}", c.short_name, s.type_.as_str()),
url, url,
redacted_url, redacted_url,
detector,
}) })
} }
@ -119,6 +123,11 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::
}; };
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic(); let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
// TODO: verify width/height. // TODO: verify width/height.
let mut detector_stream = match self.detector.as_ref() {
None => None,
Some(od) => Some(crate::analytics::ObjectDetectorStream::new(
stream.get_video_codecpar(), &od)?),
};
let extra_data = stream.get_extra_data()?; let extra_data = stream.get_extra_data()?;
let video_sample_entry_id = { let video_sample_entry_id = {
let _t = TimerGuard::new(&clocks, || "inserting video sample entry"); let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
@ -144,6 +153,9 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks + Clone, S: 'a + stream::
debug!("{}: have first key frame", self.short_name); debug!("{}: have first key frame", self.short_name);
seen_key_frame = true; seen_key_frame = true;
} }
if let (Some(a_s), Some(a)) = (detector_stream.as_mut(), self.detector.as_ref()) {
a_s.process_frame(&pkt, &a)?;
}
let frame_realtime = clocks.monotonic() + realtime_offset; let frame_realtime = clocks.monotonic() + realtime_offset;
let local_time = recording::Time::new(frame_realtime); let local_time = recording::Time::new(frame_realtime);
rotate = if let Some(r) = rotate { rotate = if let Some(r) = rotate {
@ -273,6 +285,10 @@ mod tests {
Ok(pkt) Ok(pkt)
} }
fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters<'_> {
self.inner.get_video_codecpar()
}
fn get_extra_data(&self) -> Result<h264::ExtraData, Error> { self.inner.get_extra_data() } fn get_extra_data(&self) -> Result<h264::ExtraData, Error> { self.inner.get_extra_data() }
} }
@ -355,7 +371,7 @@ mod tests {
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
let dir = db.dirs_by_stream_id.get(&testutil::TEST_STREAM_ID).unwrap().clone(); let dir = db.dirs_by_stream_id.get(&testutil::TEST_STREAM_ID).unwrap().clone();
stream = super::Streamer::new(&env, dir, db.syncer_channel.clone(), stream = super::Streamer::new(&env, dir, db.syncer_channel.clone(),
testutil::TEST_STREAM_ID, camera, s, 0, 3).unwrap(); testutil::TEST_STREAM_ID, camera, s, 0, 3, None).unwrap();
} }
stream.run(); stream.run();
assert!(opener.streams.lock().is_empty()); assert!(opener.streams.lock().is_empty());