diff --git a/CMakeLists.txt b/CMakeLists.txt index 9b9f74d..c6386f6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,8 +46,7 @@ set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -ggdb") # Dependencies. # -# https://cmake.org/cmake/help/v3.0/module/FindProtobuf.html -find_package(Protobuf REQUIRED) +find_package(Threads REQUIRED) # https://gflags.github.io/gflags/#cmake mentions a cmake module, but at # least on Ubuntu 15.10, libgflags-dev does not include it. There's no @@ -95,7 +94,6 @@ endif() enable_testing() # http://www.kaizou.org/2014/11/gtest-cmake/ -find_package(Threads REQUIRED) include(ExternalProject) ExternalProject_Add( GMockProject diff --git a/README.md b/README.md index 9a93502..9d1bb05 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,6 @@ from source. It requires several packages to build: [nghttp2](https://github.com/tatsuhiro-t/nghttp2) in the future.) Unfortunately, the libevent 2.0 bundled with current Debian releases is unsuitable. -* [protocol buffers](https://developers.google.com/protocol-buffers/), - currently just for the configuration file. * [gflags](http://gflags.github.io/gflags/), for commandline flag parsing. * [glog](https://github.com/google/glog), for debug logging. * [gperftools](https://github.com/gperftools/gperftools), for debugging. @@ -87,7 +85,6 @@ pre-requisites (see also the `Build-Depends` field in `debian/control`): $ sudo apt-get install \ build-essential \ cmake \ - libprotobuf-dev \ libavcodec-dev \ libavformat-dev \ libavutil-dev \ @@ -97,7 +94,6 @@ pre-requisites (see also the `Build-Depends` field in `debian/control`): libre2-dev \ libsqlite3-dev \ pkgconf \ - protobuf-compiler \ uuid-dev libevent 2.1 will have to be installed from source. In the future, this @@ -118,82 +114,63 @@ installed, you may be able to prepare a `.deb` package: $ sudo apt-get install devscripts dh-systemd $ debuild -us -uc -# Configuration +# Installation -Moonfire NVR expects a configuration file `/etc/moonfire-nvr.conf` (overridable -with the `--config` argument). Currently this file should contain a -text-format `moonfire_nvr.Config` protocol buffer message; see -`src/config.protodevel` which describes the meaning of fields. The general -syntax is as in the example below: `field: value` for simple fields, or -(`field < ... >`) for "message" fields. It supports line-based comments -starting with #. +Moonfire NVR should be run under a dedicated user. It keeps two kinds of +state: - base_path: "/var/lib/moonfire_nvr" - rotate_sec: 600 - http_port: 8080 +* a SQLite3 database, typically <1 GiB. It should be stored on flash if + available. +* the "sample file directory", which holds the actual samples/frames of H.264 + video. This should be quite large and typically is stored on a hard drive. - camera < - short_name: "back_west" - host: "192.168.1.101:554" - user: "admin" - password: "12345" - main_rtsp_path: "/Streaming/Channels/1" - sub_rtsp_path: "/Streaming/Channels/2" - retain_bytes: 52428800 # 50 MiB - > - camera < - short_name: "back_east" - host: "192.168.1.102:554" - user: "admin" - password: "12345" - main_rtsp_path: "/Streaming/Channels/1" - sub_rtsp_path: "/Streaming/Channels/2" - retain_bytes: 52428800 # 50 MiB - > +Both are intended to be accessed only by Moonfire NVR itself. However, the +interface for adding new cameras is not yet written, so you will have to +manually create the database and insert cameras with the `sqlite3` commandline +tool prior to starting Moonfire NVR. -The example configuration above does the following: - -* streams the `main_rtsp_path` from both cameras, reconnecting on errors, and - writing 10-minute segments of video to subdirectories of - `/var/lib/surveillance/`. (The `sub_rtsp_path` field is not used yet.) -* deletes old files to stay within the 50 MiB limit for each camera, excluding - the video file currently being written. -* writes human-readable debug logs to `/tmp/moonfire_nvr.INFO`. -* runs an HTTP server on the port 8080 (try - [`http://localhost:8080/`](http://localhost:8080/) which allows streaming the - video. Note: Moonfire NVR does not yet support authentication or SSL, so - this webserver should not be directly exposed to the Internet. - -When configuring Moonfire NVR, it may be helpful to replicate its basic -functionality with the `ffmpeg` commandline tool. The command below is roughly -equivalent to the configuration for `back_west` above. +Before setting up a camera, it may be helpful to test settings with the +`ffmpeg` commandline tool: $ ffmpeg \ -i "rtsp://admin:12345@192.168.1.101:554/Streaming/Channels/1" \ -c copy \ -map 0:0 \ + -rtsp_transport tcp \ -flags:v +global_header \ - -bsf:v dump_extra \ - -f segment \ - -segment_time 600 \ - -use_strftime 1 \ - -segment_format mp4 \ - %Y%m%d%H%M%S-back_west.mp4 + test.mp4 -# Installation - -Moonfire NVR should be run under a dedicated user. This user should own the -`base_path` directory mentioned in the configuration file. Because video is -served through an HTTP interface, there's no need for any other user to access -the files. +Once you have a working `ffmpeg` commandline, set up Moonfire NVR as follows: $ sudo addgroup --system moonfire-nvr - $ sudo adduser --system moonfire-nvr --group moonfire-nvr - $ sudo mkdir /var/lib/moonfire_nvr - $ sudo chown moonfire-nvr:moonfire-nvr /var/lib/moonfire_nvr - $ sudo chmod 700 /var/lib/moonfire_nvr + $ sudo adduser --system moonfire-nvr --home /var/lib/moonfire-nvr + $ sudo mkdir /var/lib/moonfire-nvr + $ sudo -u moonfire-nvr -H mkdir db sample + $ uuidgen | sed -e 's/-//g' + b47f48706d91414591cd6c931bf836b4 + $ sudo -u moonfire-nvr sqlite3 db/db + sqlite3> .read path/to/schema.sql + sqlite3> insert into camera ( + ...> uuid, short_name, description, host, username, password, + ...> main_rtsp_path, sub_rtsp_path, retain_bytes) values ( + ...> X'b47f48706d91414591cd6c931bf836b4', 'driveway', + ...> 'Longer description of this camera', '192.168.1.101', + ...> 'admin', '12345', '/Streaming/Channels/1', + ...> '/Streaming/Channels/2', 104857600); + sqlite3> ^D -It can be run as a systemd service. Create +See the schema SQL file's comments for more information. Note that the sum of +`retain_bytes` for all cameras should be somewhat less than the available +bytes on the sample file directory's filesystem, as the currently-writing +sample files are not included in this sum. Be sure also to subtract out the +filesystem's reserve for root (typically 5%). + +If a dedicated hard drive is available, set up the mount point: + + $ sudo vim /etc/fstab + $ sudo mount /var/lib/moonfire-nvr/sample + +Moonfire NVR can be run as a systemd service. Create `/etc/systemd/system/moonfire-nvr.service`: [Unit] @@ -201,7 +178,10 @@ It can be run as a systemd service. Create After=network-online.target [Service] - ExecStart=/usr/local/bin/moonfire_nvr + ExecStart=/usr/local/bin/moonfire-nvr \ + --sample_file_dir=/var/lib/moonfire-nvr/sample \ + --db_dir=/var/lib/moonfire-nvr/db \ + --http_port=8080 Type=simple User=moonfire-nvr Nice=-20 @@ -213,6 +193,9 @@ It can be run as a systemd service. Create [Install] WantedBy=multi-user.target +Note that the HTTP port currently has no authentication; it should not be +directly exposed to the Internet. + Complete the installation through `systemctl` commands: $ sudo systemctl daemon-reload @@ -225,7 +208,7 @@ documentation for more information. The [manual pages](http://www.freedesktop.org/software/systemd/man/) for `systemd.service` and `systemctl` may be of particular interest. -While Moonfire NVR is running, logs will be written to `/tmp/moonfire_nvr.INFO`. +While Moonfire NVR is running, logs will be written to `/tmp/moonfire-nvr.INFO`. # Getting help and getting involved diff --git a/design/schema.md b/design/schema.md index cc66c12..e1eb31f 100644 --- a/design/schema.md +++ b/design/schema.md @@ -1,7 +1,7 @@ # Moonfire NVR Storage Schema -Status: **draft, planned**. The current schema is more basic: a bunch of -.mp4 files written through ffmpeg, named for the camera and start time. +Status: **current**. This is largely implemented; there is optimization and +testing work left to do. ## Objective @@ -139,8 +139,6 @@ Each recording is stored in two places: associated with the segment, including the sample-by-sample contents of the MPEG-4 `stbl` box. At 30 fps, a row is expected to require roughly 4 KB of storage (2 bytes per sample, plus some fixed overhead). - **TODO:** more efficient to split each row in two, putting the blob in a - separate table? not every access needs the blob. Putting the metadata on flash means metadata operations can be fast (sub-millisecond random access, with parallelism) and do not take precious @@ -231,11 +229,12 @@ Because a major part of the recording state is outside the SQL database, care must be taken to guarantee consistency and durability. Moonfire NVR maintains three invariants about sample files: -1. `recording` table rows in the `WRITTEN` state have sample files on disk +1. `recording` table rows have sample files on disk (named by the given UUID) with the indicated size and SHA-1 hash. -2. There are no sample files without a corresponding `recording` table row. -3. After an orderly shutdown of Moonfire NVR, all rows are in the `WRITTEN` - state, even if there have been previous crashes. +2. There are no sample files without a corresponding `recording` or + `reserved_sample_files` table row referencing their UUID. +3. After an orderly shutdown of Moonfire NVR, there are no + `reserved_sample_files` rows, even if there have been previous crashes. The first invariant provides certainty that a recording is properly stored. It would be prohibitively expensive to verify hashes on demand (when listing or @@ -261,38 +260,37 @@ instead. One file could be mistaken for another on database vs directory mismatch. With UUIDs, this is impossible: by design they can be assumed to be universally unique, so two distinct recordings will never share a UUID. -To maintain these invariants, a row in the `recording` table is in one of three -states: `WRITING`, `WRITTEN, and `DELETING`. These are updated through -the following procedures: +These invariants are updated through the following procedure: *Create a recording:* -1. Insert a `recording` row, in state `WRITING`. +1. Insert a `reserved_sample_files` row, in state `WRITING`. 2. Write the sample file, aborting if `open(..., O\_WRONLY|O\_CREATE|O\_EXCL)` fails with `EEXIST`. (This would indicate a non-unique UUID, a serious defect.) 3. `fsync()` the sample file. 4. `fsync()` the sample file directory. -5. Update the `recording` row from state `WRITING` to state `WRITTEN`, +5. Replace the `reserved_sample_files` row with a `recording` row, marking its size and SHA-1 hash in the process. *Delete a recording:* -1. Update the `recording` row from state `WRITTEN` to state `DELETING`. +1. Replace the `recording` row with a `reserved_sample_files` row in state + `DELETED`. 2. `unlink()` the sample file, warning on `ENOENT`. (This would indicate invariant #2 is false.) 3. `fsync()` the sample file directory. -4. Delete the `recording` row. +4. Delete the `reserved_sample_files` row. *Startup (crash recovery):* 1. Acquire a lock to guarantee this is the only Moonfire NVR process running against the given database. This lock is not released until program shutdown. -2. Query `recordings` table for rows with status `WRITING` or `DELETING`. +2. Query `reserved_sample_files` table. 3. `unlink()` all the sample files associated with rows returned by #2, ignoring `ENOENT`. 4. `fsync()` the samples directory. -5. Delete the rows returned by #2 from the `recordings` table. +5. Delete the rows returned by #2 from the `reserved_sample_files` table. The procedures can be batched: while for a given recording, the steps must be strictly ordered, multiple recordings can be proceeding through the steps @@ -310,14 +308,15 @@ such that the first directory in which a recording is found must have a complete copy (and subsequent directories' copies may be partial/corrupt). It'd also be possible to conserve some partial recordings. Moonfire NVR could, -as a recording is written, update its row to reflect the latest sample tables, -size, and hash fields while keeping status `WRITING`. On startup, the file -would be truncated to match and then status updated to `WRITTEN`. The file -would either have to be synced prior to each update (to guarantee it is at -least as new as the row) or multiple checkpoints would be kept, using the last -one with a correct hash (if any) on a best-effort basis. However, this may not -be worth the complexity; it's simpler to just keep recording time short enough -that losing partial recordings is not a problem. +as a recording is written, record the latest sample tables, +size, and hash fields without marking the recording as fully written. On +startup, the file would be truncated to match and then the recording marked +as fully written. The file would either have to be synced prior to each update +(to guarantee it is at least as new as the row) or multiple checkpoints would +be kept, using the last one with a correct hash (if any) on a best-effort +basis. However, this may not be worth the complexity; it's simpler to just +keep recording time short enough that losing partial recordings is not a +problem. ### Verifying invariants @@ -364,24 +363,25 @@ in the background at low priority. ### Recording table +The snippet below is a illustrative excerpt of the SQLite schema; see +`schema.sql` for the authoritative, up-to-date version. + -- A single, typically 60-second, recorded segment of video. create table recording ( id integer primary key, camera_id integer references camera (id) not null, - status integer not null, -- 0 (WRITING), 1 (WRITTEN), or 2 (DELETING) - sample_file_uuid blob unique not null, sample_file_sha1 blob, sample_file_size integer, - -- The starting and ending time of the recording, in 90 kHz units since + -- The starting time and duration of the recording, in 90 kHz units since -- 1970-01-01 00:00:00 UTC. start_time_90k integer not null, - end_time_90k integer, + duration_90k integer, video_samples integer, - video_sample_entry_sha1 blob references visual_sample_entry (sha1), + video_sample_entry_id blob references visual_sample_entry (id), video_index blob, ... @@ -400,7 +400,7 @@ in the background at low priority. -- A serialized SampleEntry box, including the leading length and box -- type (avcC in the case of H.264). - bytes blob + data blob ); As mentioned by the `start_time_90k` field above, recordings use a 90 kHz time @@ -507,7 +507,7 @@ to store metadata and the simple, consistent format of sample indexes. ### Copyright -This file is part of Moonfire NVR, a security camera digital video recorder. +This file is part of Moonfire NVR, a security camera network video recorder. Copyright (C) 2016 Scott Lamb This program is free software: you can redistribute it and/or modify @@ -546,6 +546,8 @@ along with this program. If not, see . [wdpurple]: http://www.wdc.com/en/products/products.aspx?id=1210 [wd20eurs]: http://www.wdc.com/wdproducts/library/SpecSheet/ENG/2879-701250.pdf [seeker]: http://www.linuxinsight.com/how_fast_is_your_disk.html +[rfc-3551]: https://www.ietf.org/rfc/rfc3551.txt +[hikvision-sr]: http://www.cctvforum.com/viewtopic.php?f=19&t=44534 [iso-14496-12]: http://www.iso.org/iso/home/store/catalogue_ics/catalogue_detail_ics.htm?csnumber=68960 [sqlite3]: https://www.sqlite.org/ [sqlite3-wal]: https://www.sqlite.org/wal.html diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ccedc8a..bc3e811 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -28,16 +28,15 @@ # along with this program. If not, see . include_directories(${CMAKE_CURRENT_BINARY_DIR}) -PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS config.protodevel) set(MOONFIRE_DEPS + ${CMAKE_THREAD_LIBS_INIT} ${FFMPEG_LIBRARIES} ${LIBEVENT_LIBRARIES} ${GFLAGS_LIBRARIES} ${GLOG_LIBRARIES} ${OPENSSL_LIBRARIES} ${PROFILER_LIBRARIES} - ${PROTOBUF_LIBRARIES} ${RE2_LIBRARIES} ${SQLITE_LIBRARIES} ${UUID_LIBRARIES}) diff --git a/src/config.protodevel b/src/config.protodevel deleted file mode 100644 index 2b423a2..0000000 --- a/src/config.protodevel +++ /dev/null @@ -1,93 +0,0 @@ -// This file is part of Moonfire NVR, a security camera digital video recorder. -// Copyright (C) 2016 Scott Lamb -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// In addition, as a special exception, the copyright holders give -// permission to link the code of portions of this program with the -// OpenSSL library under certain conditions as described in each -// individual source file, and distribute linked combinations including -// the two. -// -// You must obey the GNU General Public License in all respects for all -// of the code used other than OpenSSL. If you modify file(s) with this -// exception, you may extend this exception to your version of the -// file(s), but you are not obligated to do so. If you do not wish to do -// so, delete this exception statement from your version. If you delete -// this exception statement from all source files in the program, then -// also delete it here. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . -// -// config.protodevel: protocol buffer definition for configuration. -// Currently this is used only for an ASCII-format protocol buffer config -// file, so tag numbers are insignificant. Additionally, as moonfire-nvr has not -// yet reached 1.0, there is no promise of compatibility between versions--- -// names and structure are free to change as desired. -// -// Quick note to those unfamiliar with protocol buffers: the word "optional" -// below does *not* imply that a field may be omitted. If the comment does not -// say that a field may be omitted, supply the field. "optional" is -// just a word that proto2 syntax wants everywhere, like "please". See also: -// http://stackoverflow.com/questions/31801257/why-required-and-optional-is-removed-in-protocol-buffers-3 - -syntax = "proto2"; - -package moonfire_nvr; - -message Camera { - // A short name of the camera, used as a path component and in log messages. - optional string short_name = 1; - - // A short description of the camera. - optional string description = 2; - - // The host (or IP address) to use in rtsp:// URLs when accessing the camera. - optional string host = 3; - - // The username to use when accessing the camera. - // If empty, no username or password will be supplied. - optional string user = 4; - - // The password to use when accessing the camera. - optional string password = 5; - - // The path (starting with "/") to use in rtsp:// URLs to reference this - // camera's "main" (full-quality) video stream. - optional string main_rtsp_path = 6; - - // The path (starting with "/") to use in rtsp:// URLs to reference this - // camera's "main" (full-quality) video stream. Currently unused. - optional string sub_rtsp_path = 7; - - // The number of bytes of video to retain, excluding the currently-recording - // file. Older files will be deleted as necessary to stay within this limit. - optional uint64 retain_bytes = 8; -} - -message Config { - // The base path for recorded files. E.g., "/var/lib/surveillance". - // Each camera will record to a subdirectory of this path. - optional string base_path = 1; - - // Approximate number of seconds after which to close a video file and - // open a new one. E.g., 600 (meaning 10 minutes). All files will begin with - // a key frame, so video files will often be slightly over this limit. - optional uint32 rotate_sec = 2; - - // A port to use for a HTTP server. For example, "8080". - // This webserver currently allows unauthenticated access to video files, so - // it should not be exposed to the Internet. - optional uint32 http_port = 4; - - repeated Camera camera = 3; -} diff --git a/src/ffmpeg.cc b/src/ffmpeg.cc index 7b7b45d..31fc312 100644 --- a/src/ffmpeg.cc +++ b/src/ffmpeg.cc @@ -193,7 +193,7 @@ class RealInputVideoPacketStream : public InputVideoPacketStream { << "; only interested in " << stream_index_; continue; } - VLOG(2) << "Read packet with pts=" << pkt->pkt()->pts + VLOG(3) << "Read packet with pts=" << pkt->pkt()->pts << ", dts=" << pkt->pkt()->dts << ", key=" << pkt->is_key(); return true; } @@ -229,6 +229,8 @@ class RealVideoSource : public VideoSource { std::unique_ptr stream; Dictionary open_options; if (!open_options.Set("rtsp_transport", "tcp", error_message) || + // https://trac.ffmpeg.org/ticket/5018 workaround attempt. + !open_options.Set("probesize", "262144", error_message) || !open_options.Set("user-agent", "moonfire-nvr", error_message) || // 10-second socket timeout, in microseconds. !open_options.Set("stimeout", "10000000", error_message)) { @@ -306,121 +308,6 @@ class RealVideoSource : public VideoSource { } // namespace -bool OutputVideoPacketStream::OpenFile(const std::string &filename, - const InputVideoPacketStream &input, - std::string *error_message) { - CHECK(ctx_ == nullptr) << ": already open."; - CHECK(stream_ == nullptr); - - if ((ctx_ = avformat_alloc_context()) == nullptr) { - *error_message = "avformat_alloc_context failed."; - return false; - } - - ctx_->oformat = av_guess_format(nullptr, filename.c_str(), nullptr); - if (ctx_->oformat == nullptr) { - *error_message = - StrCat("Can't find output format for filename: ", filename.c_str()); - avformat_free_context(ctx_); - ctx_ = nullptr; - return false; - } - - int ret = avio_open2(&ctx_->pb, filename.c_str(), AVIO_FLAG_WRITE, nullptr, - nullptr); - if (ret < 0) { - avformat_free_context(ctx_); - ctx_ = nullptr; - *error_message = AvError2Str("avio_open2", ret); - return false; - } - stream_ = avformat_new_stream(ctx_, input.stream()->codec->codec); - if (stream_ == nullptr) { - avformat_free_context(ctx_); - ctx_ = nullptr; - unlink(filename.c_str()); - *error_message = AvError2Str("avformat_new_stream", ret); - return false; - } - stream_->time_base = input.stream()->time_base; - ret = avcodec_copy_context(stream_->codec, input.stream()->codec); - if (ret != 0) { - avformat_free_context(ctx_); - ctx_ = nullptr; - stream_ = nullptr; - unlink(filename.c_str()); - *error_message = AvError2Str("avcodec_copy_context", ret); - return false; - } - stream_->codec->codec_tag = 0; - if ((ctx_->oformat->flags & AVFMT_GLOBALHEADER) != 0) { - stream_->codec->flags |= CODEC_FLAG_GLOBAL_HEADER; - } - - ret = avformat_write_header(ctx_, nullptr); - if (ret != 0) { - avformat_free_context(ctx_); - ctx_ = nullptr; - stream_ = nullptr; - unlink(filename.c_str()); - *error_message = AvError2Str("avformat_write_header", ret); - return false; - } - frames_written_ = 0; - key_frames_written_ = 0; - return true; -} - -bool OutputVideoPacketStream::Write(VideoPacket *pkt, - std::string *error_message) { -#if 0 - if (pkt->pkt()->pts < min_next_pts_ || pkt->pkt()->dts < min_next_dts_) { - *error_message = StrCat("refusing to write non-increasing pts/dts, pts=", - pkt->pkt()->pts, " vs min ", min_next_pts_, " dts=", - pkt->pkt()->dts, " vs min ", min_next_dts_); - return false; - } - min_next_pts_ = pkt->pkt()->pts + 1; - min_next_dts_ = pkt->pkt()->dts + 1; -#endif - VLOG(2) << "Writing packet with pts=" << pkt->pkt()->pts - << " dts=" << pkt->pkt()->dts << ", key=" << pkt->is_key(); - int ret = av_write_frame(ctx_, pkt->pkt()); - if (ret < 0) { - *error_message = AvError2Str("av_write_frame", ret); - return false; - } - ++frames_written_; - if (pkt->is_key()) { - key_frames_written_++; - } - return true; -} - -void OutputVideoPacketStream::Close() { - if (ctx_ == nullptr) { - CHECK(stream_ == nullptr); - return; - } - - int ret = av_write_trailer(ctx_); - if (ret != 0) { - LOG(WARNING) << AvError2Str("av_write_trailer", ret); - } - - ret = avio_closep(&ctx_->pb); - if (ret != 0) { - LOG(WARNING) << AvError2Str("avio_closep", ret); - } - avformat_free_context(ctx_); - ctx_ = nullptr; - stream_ = nullptr; - frames_written_ = -1; - key_frames_written_ = -1; - min_next_pts_ = std::numeric_limits::min(); - min_next_dts_ = std::numeric_limits::min(); -} - VideoSource *GetRealVideoSource() { static auto *real_video_source = new RealVideoSource; // never deleted. return real_video_source; diff --git a/src/ffmpeg.h b/src/ffmpeg.h index e22a656..8528fbf 100644 --- a/src/ffmpeg.h +++ b/src/ffmpeg.h @@ -144,34 +144,6 @@ class VideoSource { // Returns a VideoSource for production use, which will never be deleted. VideoSource *GetRealVideoSource(); -class OutputVideoPacketStream { - public: - OutputVideoPacketStream() {} - OutputVideoPacketStream(const OutputVideoPacketStream &) = delete; - OutputVideoPacketStream &operator=(const OutputVideoPacketStream &) = delete; - - ~OutputVideoPacketStream() { Close(); } - - bool OpenFile(const std::string &filename, - const InputVideoPacketStream &input, - std::string *error_message); - - bool Write(VideoPacket *pkt, std::string *error_message); - - void Close(); - - bool is_open() const { return ctx_ != nullptr; } - AVRational time_base() const { return stream_->time_base; } - - private: - int64_t key_frames_written_ = -1; - int64_t frames_written_ = -1; - int64_t min_next_dts_ = std::numeric_limits::min(); - int64_t min_next_pts_ = std::numeric_limits::min(); - AVFormatContext *ctx_ = nullptr; // owned. - AVStream *stream_ = nullptr; // ctx_ owns. -}; - } // namespace moonfire_nvr #endif // MOONFIRE_NVR_FFMPEG_H diff --git a/src/filesystem.cc b/src/filesystem.cc index db7a34f..b1d18b5 100644 --- a/src/filesystem.cc +++ b/src/filesystem.cc @@ -125,6 +125,10 @@ class RealFile : public File { return (ftruncate(fd_, length) < 0) ? errno : 0; } + int Unlink(const char *pathname) { + return unlinkat(fd_, pathname, 0) != 0 ? errno : 0; + } + int Write(re2::StringPiece data, size_t *bytes_written) final { ssize_t ret; while ((ret = write(fd_, data.data(), data.size())) == -1 && errno == EINTR) diff --git a/src/filesystem.h b/src/filesystem.h index 1fb26ae..a5adaad 100644 --- a/src/filesystem.h +++ b/src/filesystem.h @@ -82,6 +82,9 @@ class File { // ftruncate(), returning 0 on success or errno>0 on failure. virtual int Truncate(off_t length) = 0; + // unlink() the specified file, returning 0 on success or errno>0 on failure. + virtual int Unlink(const char *path) = 0; + // Write to the file, returning 0 on success or errno>0 on failure. // On success, |bytes_written| will be updated. virtual int Write(re2::StringPiece data, size_t *bytes_written) = 0; @@ -116,8 +119,9 @@ class MockFile : public File { MOCK_METHOD3(Read, int(void *, size_t, size_t *)); MOCK_METHOD1(Stat, int(struct stat *)); MOCK_METHOD0(Sync, int()); - MOCK_METHOD2(Write, int(re2::StringPiece, size_t *)); MOCK_METHOD1(Truncate, int(off_t)); + MOCK_METHOD1(Unlink, int(const char *)); + MOCK_METHOD2(Write, int(re2::StringPiece, size_t *)); }; // Interface to the local filesystem. There's typically one per program, diff --git a/src/http.cc b/src/http.cc index b45f6c1..b6ef2c3 100644 --- a/src/http.cc +++ b/src/http.cc @@ -53,37 +53,6 @@ namespace moonfire_nvr { namespace { -class RealFile : public VirtualFile { - public: - RealFile(re2::StringPiece mime_type, File *dir, re2::StringPiece filename, - const struct stat &statbuf) - : mime_type_(mime_type.as_string()), stat_(statbuf) { - slice_.Init(dir, filename, ByteRange(0, statbuf.st_size)); - } - - ~RealFile() final {} - - int64_t size() const final { return stat_.st_size; } - time_t last_modified() const final { return stat_.st_mtime; } - std::string mime_type() const final { return mime_type_; } - - std::string etag() const final { - return StrCat("\"", stat_.st_ino, ":", stat_.st_size, ":", - stat_.st_mtim.tv_sec, ":", stat_.st_mtim.tv_nsec, "\""); - } - - // Add the given range of the file to the buffer. - int64_t AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final { - return slice_.AddRange(range, buf, error_message); - } - - private: - RealFileSlice slice_; - const std::string mime_type_; - const struct stat stat_; -}; - // An HttpServe call still in progress. struct ServeInProgress { ByteRange left; @@ -444,11 +413,4 @@ void HttpServe(const std::shared_ptr &file, evhttp_request *req) { return ServeChunkCallback(con, serve); } -void HttpServeFile(evhttp_request *req, const std::string &mime_type, File *dir, - const std::string &filename, const struct stat &statbuf) { - return HttpServe(std::shared_ptr( - new RealFile(mime_type, dir, filename, statbuf)), - req); -} - } // namespace moonfire_nvr diff --git a/src/http.h b/src/http.h index 1589a8f..6691980 100644 --- a/src/http.h +++ b/src/http.h @@ -290,11 +290,6 @@ class FileSlices : public FileSlice { // added to guarantee VirtualFile objects outlive the HTTP requests they serve. void HttpServe(const std::shared_ptr &file, evhttp_request *req); -// Serve a file over HTTP. Expects the caller to supply a sanitized |filename| -// (rather than taking it straight from the path specified in |req|). -void HttpServeFile(evhttp_request *req, const std::string &mime_type, File *dir, - const std::string &filename, const struct stat &statbuf); - namespace internal { // Value to represent result of parsing HTTP 1.1 "Range:" header. diff --git a/src/moonfire-db.cc b/src/moonfire-db.cc index 026ce7f..5d83d8a 100644 --- a/src/moonfire-db.cc +++ b/src/moonfire-db.cc @@ -239,11 +239,12 @@ bool MoonfireDatabase::Init(Database *db, std::string *error_message) { insert_recording_stmt_ = db_->Prepare( R"( insert into recording (camera_id, sample_file_bytes, start_time_90k, - duration_90k, video_samples, video_sync_samples, - video_sample_entry_id, sample_file_uuid, - sample_file_sha1, video_index) + duration_90k, local_time_delta_90k, video_samples, + video_sync_samples, video_sample_entry_id, + sample_file_uuid, sample_file_sha1, video_index) values (:camera_id, :sample_file_bytes, :start_time_90k, - :duration_90k, :video_samples, :video_sync_samples, + :duration_90k, :local_time_delta_90k, + :video_samples, :video_sync_samples, :video_sample_entry_id, :sample_file_uuid, :sample_file_sha1, :video_index); )", @@ -481,11 +482,10 @@ std::vector MoonfireDatabase::ReserveSampleFiles( if (n == 0) { return std::vector(); } - auto *gen = GetRealUuidGenerator(); std::vector uuids; uuids.reserve(n); for (int i = 0; i < n; ++i) { - uuids.push_back(gen->Generate()); + uuids.push_back(uuidgen_->Generate()); } DatabaseContext ctx(db_); if (!ctx.BeginTransaction(error_message)) { @@ -535,7 +535,10 @@ bool MoonfireDatabase::InsertVideoSampleEntry(VideoSampleEntry *entry, insert_run.BindInt64(":height", entry->height); insert_run.BindBlob(":data", entry->data); if (insert_run.Step() != SQLITE_DONE) { - *error_message = insert_run.error_message(); + *error_message = + StrCat("insert video sample entry: ", insert_run.error_message(), + ": sha1=", ToHex(entry->sha1), ", dimensions=", entry->width, + "x", entry->height, ", data=", ToHex(entry->data)); return false; } entry->id = ctx.last_insert_rowid(); @@ -550,10 +553,10 @@ bool MoonfireDatabase::InsertRecording(Recording *recording, *error_message = StrCat("recording already has id ", recording->id); return false; } - if (recording->end_time_90k <= recording->start_time_90k) { + if (recording->end_time_90k < recording->start_time_90k) { *error_message = - StrCat("end time ", recording->end_time_90k, - " must be greater than start time ", recording->start_time_90k); + StrCat("end time ", recording->end_time_90k, " must be >= start time ", + recording->start_time_90k); return false; } DatabaseContext ctx(db_); @@ -585,6 +588,8 @@ bool MoonfireDatabase::InsertRecording(Recording *recording, insert_run.BindInt64(":start_time_90k", recording->start_time_90k); insert_run.BindInt64(":duration_90k", recording->end_time_90k - recording->start_time_90k); + insert_run.BindInt64(":local_time_delta_90k", + recording->local_time_90k - recording->start_time_90k); insert_run.BindInt64(":video_samples", recording->video_samples); insert_run.BindInt64(":video_sync_samples", recording->video_sync_samples); insert_run.BindInt64(":video_sample_entry_id", @@ -594,8 +599,20 @@ bool MoonfireDatabase::InsertRecording(Recording *recording, insert_run.BindBlob(":sample_file_sha1", recording->sample_file_sha1); insert_run.BindBlob(":video_index", recording->video_index); if (insert_run.Step() != SQLITE_DONE) { - LOG(ERROR) << "insert_run failed: " << insert_run.error_message(); - *error_message = insert_run.error_message(); + *error_message = + StrCat("insert failed: ", insert_run.error_message(), ", camera_id=", + recording->camera_id, ", sample_file_bytes=", + recording->sample_file_bytes, ", start_time_90k=", + recording->start_time_90k, ", duration_90k=", + recording->end_time_90k - recording->start_time_90k, + ", local_time_delta_90k=", + recording->local_time_90k - recording->start_time_90k, + ", video_samples=", recording->video_samples, + ", video_sync_samples=", recording->video_sync_samples, + ", video_sample_entry_id=", recording->video_sample_entry_id, + ", sample_file_uuid=", recording->sample_file_uuid.UnparseText(), + ", sample_file_sha1=", ToHex(recording->sample_file_sha1), + ", video_index length ", recording->video_index.size()); ctx.RollbackTransaction(); return false; } diff --git a/src/moonfire-db.h b/src/moonfire-db.h index 16324ef..ce57132 100644 --- a/src/moonfire-db.h +++ b/src/moonfire-db.h @@ -201,6 +201,12 @@ class MoonfireDatabase { bool MarkSampleFilesDeleted(const std::vector &uuids, std::string *error_message); + // Replace the default real UUID generator with the supplied one. + // Exposed only for testing; not thread-safe. + void SetUuidGeneratorForTesting(UuidGenerator *uuidgen) { + uuidgen_ = uuidgen; + } + private: struct CameraData { // Cached values of the matching fields from the camera row. @@ -230,6 +236,7 @@ class MoonfireDatabase { std::string *error_message); Database *db_ = nullptr; + UuidGenerator *uuidgen_ = GetRealUuidGenerator(); Statement list_camera_recordings_stmt_; Statement build_mp4_stmt_; Statement insert_reservation_stmt_; diff --git a/src/moonfire-nvr-main.cc b/src/moonfire-nvr-main.cc index fd614f6..00c2451 100644 --- a/src/moonfire-nvr-main.cc +++ b/src/moonfire-nvr-main.cc @@ -44,16 +44,21 @@ #include #include #include -#include -#include #include -#include "config.pb.h" #include "ffmpeg.h" #include "profiler.h" +#include "moonfire-db.h" #include "moonfire-nvr.h" +#include "sqlite.h" +#include "string.h" +#include "web.h" -DEFINE_string(config, "/etc/moonfire-nvr.conf", "Path to configuration file."); +using moonfire_nvr::StrCat; + +DEFINE_int32(http_port, 0, ""); +DEFINE_string(db_dir, "", ""); +DEFINE_string(sample_file_dir, "", ""); namespace { @@ -81,22 +86,6 @@ void EventLogCallback(int severity, const char* msg) { google::LogMessage("libevent", 0, glog_level).stream() << msg; } -bool LoadConfiguration(const std::string& filename, - moonfire_nvr::Config* config) { - int fd = open(filename.c_str(), O_RDONLY); - if (fd == -1) { - PLOG(ERROR) << "can't open " << filename; - return false; - } - google::protobuf::io::FileInputStream file(fd); - file.SetCloseOnDelete(true); - // TODO(slamb): report more specific errors via an ErrorCollector. - if (!google::protobuf::TextFormat::Parse(&file, config)) { - LOG(ERROR) << "can't parse " << filename; - } - return true; -} - // Called on SIGTERM or SIGINT. void SignalCallback(evutil_socket_t, short, void*) { event_base_loopexit(base, nullptr); @@ -108,42 +97,80 @@ void FlushLogsCallback(evutil_socket_t, short, void* ev) { event_add(reinterpret_cast(ev), &kLogFlushInterval)); } -void HttpCallback(evhttp_request* req, void* arg) { - auto* nvr = reinterpret_cast(arg); - nvr->HttpCallback(req); -} - } // namespace +// Note that main never returns; it calls exit on either success or failure. +// This avoids the need to design an orderly shutdown for all dependencies, +// instead letting the OS clean up memory allocations en masse. State may be +// allocated in whatever way is most convenient: on the stack, in a unique_ptr +// (that may never go out of scope), or as a bare pointer that is never +// deleted. int main(int argc, char** argv) { - GOOGLE_PROTOBUF_VERIFY_VERSION; google::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); google::InstallFailureSignalHandler(); signal(SIGPIPE, SIG_IGN); - moonfire_nvr::Config config; - if (!LoadConfiguration(FLAGS_config, &config)) { + if (FLAGS_sample_file_dir.empty()) { + LOG(ERROR) << "--sample_file_dir must be specified; exiting."; exit(1); } + if (FLAGS_db_dir.empty()) { + LOG(ERROR) << "--db_dir must be specified; exiting."; + exit(1); + } + + if (FLAGS_http_port == 0) { + LOG(ERROR) << "--http_port must be specified; exiting."; + exit(1); + } + + moonfire_nvr::Environment env; + env.clock = moonfire_nvr::GetRealClock(); + env.video_source = moonfire_nvr::GetRealVideoSource(); + + std::unique_ptr sample_file_dir; + int ret = moonfire_nvr::GetRealFilesystem()->Open( + FLAGS_sample_file_dir.c_str(), O_DIRECTORY | O_RDONLY, &sample_file_dir); + if (ret != 0) { + LOG(ERROR) << "Unable to open --sample_file_dir=" << FLAGS_sample_file_dir + << ": " << strerror(ret) << "; exiting."; + exit(1); + } + env.sample_file_dir = sample_file_dir.release(); + + moonfire_nvr::Database db; + std::string error_msg; + std::string db_path = StrCat(FLAGS_db_dir, "/db"); + if (!db.Open(db_path.c_str(), SQLITE_OPEN_READWRITE, &error_msg)) { + LOG(ERROR) << error_msg << "; exiting."; + exit(1); + } + + moonfire_nvr::MoonfireDatabase mdb; + CHECK(mdb.Init(&db, &error_msg)) << error_msg; + env.mdb = &mdb; + + moonfire_nvr::WebInterface web(&env); + event_set_log_callback(&EventLogCallback); LOG(INFO) << "libevent: compiled with version " << LIBEVENT_VERSION << ", running with version " << event_get_version(); base = CHECK_NOTNULL(event_base_new()); - std::unique_ptr nvr(new moonfire_nvr::Nvr); - std::string error_msg; - if (!nvr->Init(config, &error_msg)) { - LOG(ERROR) << "Unable to initialize: " << error_msg; + std::unique_ptr nvr(new moonfire_nvr::Nvr(&env)); + if (!nvr->Init(&error_msg)) { + LOG(ERROR) << "Unable to initialize: " << error_msg << "; exiting."; exit(1); } evhttp* http = CHECK_NOTNULL(evhttp_new(base)); moonfire_nvr::RegisterProfiler(base, http); - evhttp_set_gencb(http, &HttpCallback, nvr.get()); - if (evhttp_bind_socket(http, "0.0.0.0", config.http_port()) != 0) { - LOG(ERROR) << "Unable to bind to port " << config.http_port(); + web.Register(http); + if (evhttp_bind_socket(http, "0.0.0.0", FLAGS_http_port) != 0) { + LOG(ERROR) << "Unable to bind to --http_port=" << FLAGS_http_port + << "; exiting."; exit(1); } diff --git a/src/moonfire-nvr-test.cc b/src/moonfire-nvr-test.cc index d4b9cdd..0f0a7ed 100644 --- a/src/moonfire-nvr-test.cc +++ b/src/moonfire-nvr-test.cc @@ -30,6 +30,10 @@ // // moonfire-nvr-test.cc: tests of the moonfire-nvr.cc interface. +#include +#include +#include + #include #include #include @@ -69,97 +73,58 @@ class MockVideoSource : public VideoSource { InputVideoPacketStream *(const std::string &, std::string *)); }; -class FileManagerTest : public testing::Test { - protected: - FileManagerTest() { - test_dir_ = PrepareTempDirOrDie("moonfire-nvr-file-manager"); - env_.fs = GetRealFilesystem(); - } - - std::vector GetFilenames(const FileManager &mgr) { - std::vector out; - mgr.ForEachFile([&out](const std::string &f, const struct stat &) { - out.push_back(f); - }); - return out; - } - - Environment env_; - std::string test_dir_; -}; - -TEST_F(FileManagerTest, InitWithNoDirectory) { - std::string subdir = test_dir_ + "/" + "subdir"; - FileManager manager("foo", subdir, 0, &env_); - - // Should succeed. - std::string error_message; - EXPECT_TRUE(manager.Init(&error_message)) << error_message; - - // Should create the directory. - struct stat buf; - ASSERT_EQ(0, lstat(subdir.c_str(), &buf)) << strerror(errno); - EXPECT_TRUE(S_ISDIR(buf.st_mode)); - - // Should report empty. - EXPECT_EQ(0, manager.total_bytes()); - EXPECT_THAT(GetFilenames(manager), testing::ElementsAre()); - - // Adding files: nonexistent, simple, out of order. - EXPECT_FALSE(manager.AddFile("nonexistent.mp4", &error_message)); - WriteFileOrDie(subdir + "/1.mp4", "1"); - WriteFileOrDie(subdir + "/2.mp4", "123"); - EXPECT_TRUE(manager.AddFile("2.mp4", &error_message)) << error_message; - EXPECT_EQ(3, manager.total_bytes()); - EXPECT_THAT(GetFilenames(manager), testing::ElementsAre("2.mp4")); - EXPECT_TRUE(manager.AddFile("1.mp4", &error_message)) << error_message; - EXPECT_EQ(4, manager.total_bytes()); - EXPECT_THAT(GetFilenames(manager), testing::ElementsAre("1.mp4", "2.mp4")); - - EXPECT_TRUE(manager.Rotate(&error_message)) << error_message; - EXPECT_EQ(0, manager.total_bytes()); - EXPECT_THAT(GetFilenames(manager), testing::ElementsAre()); -} - -TEST_F(FileManagerTest, InitAndRotateWithExistingFiles) { - WriteFileOrDie(test_dir_ + "/1.mp4", "1"); - WriteFileOrDie(test_dir_ + "/2.mp4", "123"); - WriteFileOrDie(test_dir_ + "/3.mp4", "12345"); - WriteFileOrDie(test_dir_ + "/other", "1234567"); - FileManager manager("foo", test_dir_, 8, &env_); - - // Should succeed. - std::string error_message; - EXPECT_TRUE(manager.Init(&error_message)) << error_message; - - EXPECT_THAT(GetFilenames(manager), - testing::ElementsAre("1.mp4", "2.mp4", "3.mp4")); - EXPECT_EQ(1 + 3 + 5, manager.total_bytes()); - - EXPECT_TRUE(manager.Rotate(&error_message)) << error_message; - EXPECT_THAT(GetFilenames(manager), testing::ElementsAre("2.mp4", "3.mp4")); - EXPECT_EQ(8, manager.total_bytes()); -} - class StreamTest : public testing::Test { public: StreamTest() { + std::string error_message; test_dir_ = PrepareTempDirOrDie("moonfire-nvr-stream-copier"); env_.clock = &clock_; env_.video_source = &video_source_; - env_.fs = GetRealFilesystem(); + int ret = moonfire_nvr::GetRealFilesystem()->Open( + test_dir_.c_str(), O_DIRECTORY | O_RDONLY, &sample_file_dir_); + CHECK_EQ(0, ret) << "open: " << strerror(ret); + env_.sample_file_dir = sample_file_dir_.get(); + + CHECK(db_.Open(StrCat(test_dir_, "/db").c_str(), + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, &error_message)) + << error_message; + std::string create_sql = ReadFileOrDie("../src/schema.sql"); + { + DatabaseContext ctx(&db_); + CHECK(RunStatements(&ctx, create_sql, &error_message)) << error_message; + auto run = ctx.UseOnce( + R"( + insert into camera (uuid, short_name, host, username, password, + main_rtsp_path, sub_rtsp_path, retain_bytes) + values (:uuid, :short_name, :host, :username, :password, + :main_rtsp_path, :sub_rtsp_path, :retain_bytes); + )"); + run.BindBlob(":uuid", GetRealUuidGenerator()->Generate().binary_view()); + run.BindText(":short_name", "test"); + run.BindText(":host", "test-camera"); + run.BindText(":username", "foo"); + run.BindText(":password", "bar"); + run.BindText(":main_rtsp_path", "/main"); + run.BindText(":sub_rtsp_path", "/sub"); + run.BindInt64(":retain_bytes", 1000000); + CHECK_EQ(SQLITE_DONE, run.Step()) << run.error_message(); + } + mdb_.SetUuidGeneratorForTesting(&uuidgen_); + CHECK(mdb_.Init(&db_, &error_message)) << error_message; + env_.mdb = &mdb_; + + ListCamerasRow row; + int n_rows = 0; + mdb_.ListCameras([&row, &n_rows](const ListCamerasRow &some_row) { + ++n_rows; + row = some_row; + return IterationControl::kContinue; + }); + CHECK_EQ(1, n_rows); + clock_.Sleep({1430006400, 0}); // 2015-04-26 00:00:00 UTC - config_.set_base_path(test_dir_); - config_.set_rotate_sec(5); - auto *camera = config_.add_camera(); - camera->set_short_name("test"); - camera->set_host("test-camera"); - camera->set_user("foo"); - camera->set_password("bar"); - camera->set_main_rtsp_path("/main"); - camera->set_sub_rtsp_path("/sub"); - camera->set_retain_bytes(1000000); + stream_.reset(new Stream(&signal_, &env_, row, 0, 5)); } // A function to use in OpenRtspRaw invocations which shuts down the stream @@ -188,6 +153,7 @@ class StreamTest : public testing::Test { } }; +#if 0 std::vector GetFrames(const std::string &path) { std::vector frames; std::string error_message; @@ -204,13 +170,39 @@ class StreamTest : public testing::Test { EXPECT_EQ("", error_message); return frames; } +#else + std::vector GetFrames(const re2::StringPiece uuid_text) { + std::vector frames; + Uuid uuid; + if (!uuid.ParseText(uuid_text)) { + ADD_FAILURE() << "unparseable: " << uuid_text; + return frames; + } + DatabaseContext ctx(&db_); + auto run = ctx.UseOnce( + "select video_index from recording where sample_file_uuid = :uuid;"); + run.BindBlob(":uuid", uuid.binary_view()); + if (run.Step() != SQLITE_ROW) { + ADD_FAILURE() << run.error_message(); + return frames; + } + for (SampleIndexIterator it(run.ColumnBlob(0)); !it.done(); it.Next()) { + frames.push_back(Frame(it.is_key(), it.start_90k(), it.duration_90k())); + } + return frames; + } +#endif + MockUuidGenerator uuidgen_; ShutdownSignal signal_; - Config config_; SimulatedClock clock_; testing::StrictMock video_source_; + Database db_; + MoonfireDatabase mdb_; + std::unique_ptr sample_file_dir_; Environment env_; std::string test_dir_; + std::unique_ptr stream_; }; class ProxyingInputVideoPacketStream : public InputVideoPacketStream { @@ -266,9 +258,7 @@ class ProxyingInputVideoPacketStream : public InputVideoPacketStream { }; TEST_F(StreamTest, Basic) { - Stream stream(&signal_, config_, &env_, config_.camera(0)); std::string error_message; - ASSERT_TRUE(stream.Init(&error_message)) << error_message; // This is a ~1 fps test video with a timebase of 90 kHz. auto in_stream = GetRealVideoSource()->OpenFile("../src/testdata/clip.mp4", @@ -280,19 +270,23 @@ TEST_F(StreamTest, Basic) { // The starting pts of the input should be irrelevant. proxy_stream->set_ts_offset(180000, std::numeric_limits::max()); + Uuid uuid1; + ASSERT_TRUE(uuid1.ParseText("00000000-0000-0000-0000-000000000001")); + Uuid uuid2; + ASSERT_TRUE(uuid2.ParseText("00000000-0000-0000-0000-000000000002")); + EXPECT_CALL(uuidgen_, Generate()) + .WillOnce(Return(uuid1)) + .WillOnce(Return(uuid2)); + EXPECT_CALL(video_source_, OpenRtspRaw("rtsp://foo:bar@test-camera/main", _)) .WillOnce(Return(proxy_stream)) .WillOnce(Invoke(this, &StreamTest::Shutdown)); - stream.Run(); - + stream_->Run(); // Compare frame-by-frame. // Note below that while the rotation is scheduled to happen near 5-second // boundaries (such as 2016-04-26 00:00:05), it gets deferred until the next // key frame, which in this case is 00:00:07. - EXPECT_THAT(stream.GetFilesForTesting(), - testing::ElementsAre("20150426000000_test.mp4", - "20150426000007_test.mp4")); - EXPECT_THAT(GetFrames("20150426000000_test.mp4"), + EXPECT_THAT(GetFrames("00000000-0000-0000-0000-000000000001"), testing::ElementsAre( Frame(true, 0, 90379), Frame(false, 90379, 89884), Frame(false, 180263, 89749), Frame(false, 270012, 89981), @@ -300,22 +294,14 @@ TEST_F(StreamTest, Basic) { Frame(false, 450048, 89967), // pts_time 5.000533, past rotation time. Frame(false, 540015, 90021), - Frame(false, 630036, - 90000))); // XXX: duration=89958 would be better! + Frame(false, 630036, 89958))); EXPECT_THAT( - GetFrames("20150426000007_test.mp4"), - testing::ElementsAre(Frame(true, 0, 90011), Frame(false, 90011, 90000))); - // Note that the final "90000" duration is ffmpeg's estimate based on frame - // rate. For non-final packets, the correct duration gets written based on - // the next packet's timestamp. The same currently applies to the first - // written segment---it uses an estimated time, not the real time until the - // next packet. This probably should be fixed... + GetFrames("00000000-0000-0000-0000-000000000002"), + testing::ElementsAre(Frame(true, 0, 90011), Frame(false, 90011, 0))); } TEST_F(StreamTest, NonIncreasingTimestamp) { - Stream stream(&signal_, config_, &env_, config_.camera(0)); std::string error_message; - ASSERT_TRUE(stream.Init(&error_message)) << error_message; auto in_stream = GetRealVideoSource()->OpenFile("../src/testdata/clip.mp4", &error_message); ASSERT_TRUE(in_stream != nullptr) << error_message; @@ -326,29 +312,28 @@ TEST_F(StreamTest, NonIncreasingTimestamp) { .WillOnce(Return(proxy_stream)) .WillOnce(Invoke(this, &StreamTest::Shutdown)); + Uuid uuid1; + ASSERT_TRUE(uuid1.ParseText("00000000-0000-0000-0000-000000000001")); + EXPECT_CALL(uuidgen_, Generate()).WillOnce(Return(uuid1)); + { ScopedMockLog log; EXPECT_CALL(log, Log(_, _, _)).Times(AnyNumber()); EXPECT_CALL(log, Log(_, _, HasSubstr("Rejecting non-increasing pts=90379"))); log.Start(); - stream.Run(); + stream_->Run(); } // The output file should still be added to the file manager, with the one - // packet that made it. - EXPECT_THAT(stream.GetFilesForTesting(), - testing::ElementsAre("20150426000000_test.mp4")); - EXPECT_THAT( - GetFrames("20150426000000_test.mp4"), - testing::ElementsAre(Frame(true, 0, 90000))); // estimated duration. + // packet that made it. The final packet on input error will have 0 + // duration. + EXPECT_THAT(GetFrames("00000000-0000-0000-0000-000000000001"), + testing::ElementsAre(Frame(true, 0, 0))); } TEST_F(StreamTest, RetryOnInputError) { - Stream stream(&signal_, config_, &env_, config_.camera(0)); std::string error_message; - ASSERT_TRUE(stream.Init(&error_message)) << error_message; - auto in_stream_1 = GetRealVideoSource()->OpenFile("../src/testdata/clip.mp4", &error_message); ASSERT_TRUE(in_stream_1 != nullptr) << error_message; @@ -367,22 +352,25 @@ TEST_F(StreamTest, RetryOnInputError) { .WillOnce(Return(proxy_stream_1)) .WillOnce(Return(proxy_stream_2)) .WillOnce(Invoke(this, &StreamTest::Shutdown)); - stream.Run(); + + Uuid uuid1; + ASSERT_TRUE(uuid1.ParseText("00000000-0000-0000-0000-000000000001")); + Uuid uuid2; + ASSERT_TRUE(uuid2.ParseText("00000000-0000-0000-0000-000000000002")); + EXPECT_CALL(uuidgen_, Generate()) + .WillOnce(Return(uuid1)) + .WillOnce(Return(uuid2)); + stream_->Run(); // Each attempt should have resulted in a file with one packet. - EXPECT_THAT(stream.GetFilesForTesting(), - testing::ElementsAre("20150426000000_test.mp4", - "20150426000001_test.mp4")); - EXPECT_THAT(GetFrames("20150426000000_test.mp4"), - testing::ElementsAre(Frame(true, 0, 90000))); - EXPECT_THAT(GetFrames("20150426000001_test.mp4"), - testing::ElementsAre(Frame(true, 0, 90000))); + EXPECT_THAT(GetFrames("00000000-0000-0000-0000-000000000001"), + testing::ElementsAre(Frame(true, 0, 0))); + EXPECT_THAT(GetFrames("00000000-0000-0000-0000-000000000002"), + testing::ElementsAre(Frame(true, 0, 0))); } TEST_F(StreamTest, DiscardInitialNonKeyFrames) { - Stream stream(&signal_, config_, &env_, config_.camera(0)); std::string error_message; - ASSERT_TRUE(stream.Init(&error_message)) << error_message; auto in_stream = GetRealVideoSource()->OpenFile("../src/testdata/clip.mp4", &error_message); ASSERT_TRUE(in_stream != nullptr) << error_message; @@ -396,28 +384,32 @@ TEST_F(StreamTest, DiscardInitialNonKeyFrames) { EXPECT_CALL(video_source_, OpenRtspRaw("rtsp://foo:bar@test-camera/main", _)) .WillOnce(Return(proxy_stream)) .WillOnce(Invoke(this, &StreamTest::Shutdown)); - stream.Run(); + + Uuid uuid1; + ASSERT_TRUE(uuid1.ParseText("00000000-0000-0000-0000-000000000001")); + Uuid uuid2; + ASSERT_TRUE(uuid2.ParseText("00000000-0000-0000-0000-000000000002")); + EXPECT_CALL(uuidgen_, Generate()) + .WillOnce(Return(uuid1)) + .WillOnce(Return(uuid2)); + stream_->Run(); // Skipped: initial key frame packet (duration 90379) // Ignored: duration 89884, 89749, 89981 (total pts time: 2.99571... sec) // Thus, the first output file should start at 00:00:02. - EXPECT_THAT(stream.GetFilesForTesting(), - testing::ElementsAre("20150426000002_test.mp4", - "20150426000006_test.mp4")); EXPECT_THAT( - GetFrames("20150426000002_test.mp4"), + GetFrames("00000000-0000-0000-0000-000000000001"), testing::ElementsAre( Frame(true, 0, 90055), Frame(false, 90055, 89967), // pts_time 5.000533, past rotation time. - Frame(false, 180022, 90021), - Frame(false, 270043, - 90000))); // XXX: duration=89958 would be better! + Frame(false, 180022, 90021), Frame(false, 270043, 89958))); EXPECT_THAT( - GetFrames("20150426000006_test.mp4"), - testing::ElementsAre(Frame(true, 0, 90011), Frame(false, 90011, 90000))); + GetFrames("00000000-0000-0000-0000-000000000002"), + testing::ElementsAre(Frame(true, 0, 90011), Frame(false, 90011, 0))); } // TODO: test output stream error (on open, writing packet, closing). +// TODO: test rotation! } // namespace } // namespace moonfire_nvr diff --git a/src/moonfire-nvr.cc b/src/moonfire-nvr.cc index 4a3d888..0ef7cbc 100644 --- a/src/moonfire-nvr.cc +++ b/src/moonfire-nvr.cc @@ -1,4 +1,4 @@ -// This file is part of Moonfire NVR, a security camera digital video recorder. +// This file is part of Moonfire NVR, a security camera network video recorder. // Copyright (C) 2016 Scott Lamb // // This program is free software: you can redistribute it and/or modify @@ -29,6 +29,29 @@ // along with this program. If not, see . // // moonfire-nvr.cc: implementation of moonfire-nvr.h. +// +// Caveats: +// +// Currently the recording thread blocks while a just-finished recording +// is synced to disk and written to the database, which can be 250+ ms. +// Likewise when recordings are being deleted. It would be better to hand +// off to a separate syncer thread, only blocking the recording when there +// would otherwise be insufficient disk space. +// +// This also commits to the SQLite database potentially several times per +// minute per camera: +// +// 1. (rarely) to get a new video sample entry id +// 2. to reserve a new uuid +// 3. to move uuids planned for deletion from "recording" to +// "reserved_sample_Files" +// 4. to mark those uuids as deleted +// 5. to insert the new recording +// +// These could be combined into a single batch per minute per camera or even +// per minute by doing some operations sooner (such as reserving the next +// minute's uuid when inserting the previous minute's recording) and some +// later (such as marking uuids as deleted). #define _BSD_SOURCE // for timegm(3). @@ -48,177 +71,21 @@ #include #include "filesystem.h" +#include "h264.h" #include "http.h" +#include "recording.h" #include "string.h" #include "time.h" using std::string; namespace moonfire_nvr { + namespace { -const char kFilenameSuffix[] = ".mp4"; +const int kRotateIntervalSec = 60; } // namespace -FileManager::FileManager(const std::string &short_name, const std::string &path, - uint64_t byte_limit, Environment *env) - : short_name_(short_name), - path_(path), - byte_limit_(byte_limit), - env_(env) {} - -bool FileManager::Init(std::string *error_message) { - // Create the directory if it doesn't exist. - // If the path exists, assume it is a valid directory. - int ret = env_->fs->Mkdir(path_.c_str(), 0700); - if (ret != 0 && ret != EEXIST) { - *error_message = StrCat("Unable to create ", path_, ": ", strerror(ret)); - return false; - } - - bool ok = true; - - auto file_fn = [this, &ok, error_message](const dirent *ent) { - string filename(ent->d_name); - if (ent->d_type != DT_REG) { - VLOG(1) << short_name_ << ": Ignoring non-plain file " << filename; - return IterationControl::kContinue; - } - if (!re2::StringPiece(filename).ends_with(kFilenameSuffix)) { - VLOG(1) << short_name_ << ": Ignoring non-matching file " << filename - << " of size " << ent->d_reclen; - return IterationControl::kContinue; - } - - if (!AddFile(filename, error_message)) { - ok = false; - return IterationControl::kBreak; // no point in doing more. - } - return IterationControl::kContinue; - }; - - if (!env_->fs->DirForEach(path_.c_str(), file_fn, error_message)) { - return false; - } - - return ok; -} - -bool FileManager::Rotate(std::string *error_message) { - mu_.lock(); - while (total_bytes_ > byte_limit_) { - CHECK(!files_.empty()) << "total_bytes_=" << total_bytes_ - << " vs retain=" << byte_limit_; - auto it = files_.begin(); - const string filename = it->first; - int64_t size = it->second.st_size; - - // Release the lock while doing (potentially slow) I/O. - // Don't mark the file as deleted yet, so that a simultaneous Rotate() call - // won't return prematurely. - mu_.unlock(); - string fpath = StrCat(path_, "/", filename); - int ret = env_->fs->Unlink(fpath.c_str()); - if (ret == 0) { - LOG(INFO) << short_name_ << ": Deleted " << filename << " to reclaim " - << size << " bytes."; - } else if (ret == ENOENT) { - // This may have happened due to a racing Rotate() call. - // In any case, the file is gone, so proceed to mark it as such. - LOG(INFO) << short_name_ << ": File " << filename - << " was already deleted."; - } else { - *error_message = - StrCat("unlink failed on ", filename, ": ", strerror(ret)); - - return false; - } - - // Note that the file has been deleted. - mu_.lock(); - if (!files_.empty()) { - it = files_.begin(); - if (it->first == filename) { - size = it->second.st_size; - files_.erase(it); - CHECK_GE(total_bytes_, size); - total_bytes_ -= size; - } - } - } - int64_t total_bytes_copy = total_bytes_; - mu_.unlock(); - LOG(INFO) << short_name_ << ": Path " << path_ << " total size is " - << total_bytes_copy << ", within limit of " << byte_limit_; - return true; -} - -bool FileManager::AddFile(const std::string &filename, - std::string *error_message) { - struct stat buf; - string fpath = StrCat(path_, "/", filename); - int ret = env_->fs->Stat(fpath.c_str(), &buf); - if (ret != 0) { - *error_message = StrCat("stat on ", fpath, " failed: ", strerror(ret)); - return false; - } - VLOG(1) << short_name_ << ": adding file " << filename << " size " - << buf.st_size; - std::lock_guard lock(mu_); - CHECK_GE(buf.st_size, 0) << fpath; - uint64_t size = buf.st_size; - if (!files_.emplace(filename, std::move(buf)).second) { - *error_message = StrCat("filename ", filename, " already present."); - return false; - } - total_bytes_ += size; - return true; -} - -void FileManager::ForEachFile(FileManager::FileCallback fn) const { - std::lock_guard lock(mu_); - for (const auto &f : files_) { - fn(f.first, f.second); - } -} - -bool FileManager::Lookup(const std::string &filename, - struct stat *statbuf) const { - std::lock_guard lock(mu_); - const auto it = files_.find(filename); - if (it != files_.end()) { - *statbuf = it->second; - return true; - } - return false; -} - -bool Stream::Init(std::string *error_message) { - // Validate configuration. - if (!IsWord(camera_.short_name())) { - *error_message = StrCat("Camera name ", camera_.short_name(), " invalid."); - return false; - } - if (rotate_interval_ <= 0) { - *error_message = StrCat("Rotate interval for ", camera_.short_name(), - " must be positive."); - return false; - } - - if (!manager_.Init(error_message)) { - return false; - } - - int ret = env_->fs->Open(camera_path_.c_str(), O_RDONLY | O_DIRECTORY, - &camera_dir_); - if (ret != 0) { - *error_message = - StrCat("Unable to open ", camera_path_, ": ", strerror(ret)); - return false; - } - - return true; -} // Call from dedicated thread. Runs until shutdown requested. void Stream::Run() { @@ -226,47 +93,48 @@ void Stream::Run() { // Do an initial rotation so that if retain_bytes has been reduced, the // bulk deletion happens now, rather than while an input stream is open. - if (!manager_.Rotate(&error_message)) { - LOG(WARNING) << short_name() + if (!RotateFiles(&error_message)) { + LOG(WARNING) << row_.short_name << ": initial rotation failed: " << error_message; } while (!signal_->ShouldShutdown()) { if (in_ == nullptr && !OpenInput(&error_message)) { - LOG(WARNING) << short_name() + LOG(WARNING) << row_.short_name << ": Failed to open input; sleeping before retrying: " << error_message; env_->clock->Sleep({1, 0}); continue; } - LOG(INFO) << short_name() << ": Calling ProcessPackets."; + LOG(INFO) << row_.short_name << ": Calling ProcessPackets."; ProcessPacketsResult res = ProcessPackets(&error_message); if (res == kInputError) { - CloseOutput(); + CloseOutput(-1); in_.reset(); - LOG(WARNING) << short_name() + start_localtime_90k_ = -1; + LOG(WARNING) << row_.short_name << ": Input error; sleeping before retrying: " << error_message; env_->clock->Sleep({1, 0}); continue; } else if (res == kOutputError) { - CloseOutput(); - LOG(WARNING) << short_name() + CloseOutput(-1); + LOG(WARNING) << row_.short_name << ": Output error; sleeping before retrying: " << error_message; env_->clock->Sleep({1, 0}); continue; } } - CloseOutput(); + CloseOutput(-1); } Stream::ProcessPacketsResult Stream::ProcessPackets( std::string *error_message) { moonfire_nvr::VideoPacket pkt; CHECK(in_ != nullptr); - CHECK(!out_.is_open()); + CHECK(!writer_.is_open()); while (!signal_->ShouldShutdown()) { if (!in_->GetNext(&pkt, error_message)) { if (error_message->empty()) { @@ -300,19 +168,13 @@ Stream::ProcessPacketsResult Stream::ProcessPackets( frame_realtime_ = env_->clock->Now(); - if (out_.is_open() && frame_realtime_.tv_sec >= rotate_time_ && + if (writer_.is_open() && frame_realtime_.tv_sec >= rotate_time_ && pkt.is_key()) { - LOG(INFO) << short_name() << ": Reached rotation time; closing " - << out_file_ << "."; - VLOG(2) << short_name() << ": (Rotation time=" << rotate_time_ - << " vs current time=" << frame_realtime_.tv_sec << ")"; - out_.Close(); - - if (!manager_.AddFile(out_file_, error_message)) { - return kOutputError; - } - } else if (out_.is_open()) { - VLOG(2) << short_name() << ": Rotation time=" << rotate_time_ + LOG(INFO) << row_.short_name << ": Reached rotation time; closing " + << recording_.sample_file_uuid.UnparseText() << "."; + CloseOutput(pkt.pkt()->pts - start_pts_); + } else if (writer_.is_open()) { + VLOG(3) << row_.short_name << ": Rotation time=" << rotate_time_ << " vs current time=" << frame_realtime_.tv_sec; } @@ -323,165 +185,242 @@ Stream::ProcessPacketsResult Stream::ProcessPackets( seen_key_frame_ = true; } - if (!out_.is_open()) { + if (!writer_.is_open()) { start_pts_ = pkt.pts(); if (!OpenOutput(error_message)) { return kOutputError; } rotate_time_ = frame_realtime_.tv_sec - - (frame_realtime_.tv_sec % rotate_interval_) + - rotate_interval_; + (frame_realtime_.tv_sec % rotate_interval_sec_) + + rotate_offset_sec_; + if (rotate_time_ <= frame_realtime_.tv_sec) { + rotate_time_ += rotate_interval_sec_; + } } - // In the output stream, the pts and dts should start at 0. - pkt.pkt()->pts -= start_pts_; - pkt.pkt()->dts -= start_pts_; - - // The input's byte position and stream index aren't relevant to the - // output. - pkt.pkt()->pos = -1; - pkt.pkt()->stream_index = 0; - - if (!out_.Write(&pkt, error_message)) { + auto start_time_90k = pkt.pkt()->pts - start_pts_; + if (prev_pkt_start_time_90k_ != -1) { + index_.AddSample(start_time_90k - prev_pkt_start_time_90k_, + prev_pkt_bytes_, prev_pkt_key_); + } + re2::StringPiece data = pkt.data(); + if (need_transform_) { + if (!TransformSampleData(data, &transform_tmp_, error_message)) { + return kInputError; + } + data = transform_tmp_; + } + if (!writer_.Write(data, error_message)) { return kOutputError; } + prev_pkt_start_time_90k_ = start_time_90k; + prev_pkt_bytes_ = data.size(); + prev_pkt_key_ = pkt.is_key(); } return kStopped; } bool Stream::OpenInput(std::string *error_message) { CHECK(in_ == nullptr); - string url = StrCat("rtsp://", camera_.user(), ":", camera_.password(), "@", - camera_.host(), camera_.main_rtsp_path()); - string redacted_url = StrCat("rtsp://", camera_.user(), ":redacted@", - camera_.host(), camera_.main_rtsp_path()); - LOG(INFO) << short_name() << ": Opening input: " << redacted_url; + string url = StrCat("rtsp://", row_.username, ":", row_.password, "@", + row_.host, row_.main_rtsp_path); + string redacted_url = StrCat("rtsp://", row_.username, ":redacted@", + row_.host, row_.main_rtsp_path); + LOG(INFO) << row_.short_name << ": Opening input: " << redacted_url; in_ = env_->video_source->OpenRtsp(url, error_message); min_next_pts_ = std::numeric_limits::min(); seen_key_frame_ = false; - return in_ != nullptr; -} - -void Stream::CloseOutput() { - out_.Close(); - // TODO: should know if the file was written or not. - std::string error_message; - if (!manager_.AddFile(out_file_, &error_message)) { - VLOG(1) << short_name() << ": AddFile on recently closed output file " - << out_file_ << "failed; the file may never have been written: " - << error_message; - } -} - -std::string Stream::MakeOutputFilename() { - const size_t kTimeBufLen = sizeof("YYYYmmDDHHMMSS"); - char formatted_time[kTimeBufLen]; - struct tm mytm; - gmtime_r(&frame_realtime_.tv_sec, &mytm); - strftime(formatted_time, kTimeBufLen, "%Y%m%d%H%M%S", &mytm); - return StrCat(formatted_time, "_", camera_.short_name(), kFilenameSuffix); -} - -bool Stream::OpenOutput(std::string *error_message) { - if (!manager_.Rotate(error_message)) { + if (in_ == nullptr) { return false; } - CHECK(!out_.is_open()); - string filename = MakeOutputFilename(); - if (!out_.OpenFile(StrCat(camera_path_, "/", filename), *in_, - error_message)) { + + // The time base should match the 90kHz frequency specified in RFC 3551 + // section 5. + if (in_->stream()->time_base.num != 1 || + in_->stream()->time_base.den != kTimeUnitsPerSecond) { + *error_message = + StrCat("unexpected time base ", in_->stream()->time_base.num, "/", + in_->stream()->time_base.den); + return false; + } + + // width and height must fix into 16-bit ints for MP4 encoding. + int max_dimension = std::numeric_limits::max(); + if (in_->stream()->codec->width > max_dimension || + in_->stream()->codec->height > max_dimension) { + *error_message = + StrCat("input dimensions ", in_->stream()->codec->width, "x", + in_->stream()->codec->height, " are too large."); + return false; + } + entry_.id = -1; + entry_.width = in_->stream()->codec->width; + entry_.height = in_->stream()->codec->height; + re2::StringPiece extradata = in_->extradata(); + if (!ParseExtraData(extradata, entry_.width, entry_.height, &entry_.data, + &need_transform_, error_message)) { + in_.reset(); + return false; + } + auto sha1 = Digest::SHA1(); + sha1->Update(entry_.data); + entry_.sha1 = sha1->Finalize(); + if (!env_->mdb->InsertVideoSampleEntry(&entry_, error_message)) { + in_.reset(); return false; } - LOG(INFO) << short_name() << ": Opened output " << filename - << ", using start_pts=" << start_pts_ - << ", input timebase=" << in_->stream()->time_base.num << "/" - << in_->stream()->time_base.den - << ", output timebase=" << out_.time_base().num << "/" - << out_.time_base().den; - out_file_ = std::move(filename); return true; } -void Stream::HttpCallbackForDirectory(evhttp_request *req) { - EvBuffer buf; - buf.AddPrintf( - "\n" - "\n" - "\n" - "%s camera recordings\n" - "\n" - "\n" - "\n" - "

%s camera recordings

\n" - "

%s

\n" - "\n" - "\n", - // short_name passed IsWord(); there's no need to escape it. - camera_.short_name().c_str(), camera_.short_name().c_str(), - EscapeHtml(camera_.description()).c_str()); - manager_.ForEachFile( - [&buf](const std::string &filename, const struct stat &statbuf) { - // Attempt to make a pretty version of the timestamp embedded in the - // filename: with separators and in the local time zone. If this fails, - // just leave it blank. - string pretty_start_time; - struct tm mytm; - memset(&mytm, 0, sizeof(mytm)); - const size_t kTimeBufLen = 50; - char tmbuf[kTimeBufLen]; - static const RE2 kFilenameRe( - // YYYY mm DD HH MM SS - "^([0-9]{4})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{2})_"); - if (RE2::PartialMatch(filename, kFilenameRe, &mytm.tm_year, - &mytm.tm_mon, &mytm.tm_mday, &mytm.tm_hour, - &mytm.tm_min, &mytm.tm_sec)) { - mytm.tm_year -= 1900; - mytm.tm_mon--; - time_t start = timegm(&mytm); - localtime_r(&start, &mytm); - strftime(tmbuf, kTimeBufLen, "%a, %d %b %Y %H:%M:%S %Z", &mytm); - pretty_start_time = tmbuf; - } - string pretty_end_time; - localtime_r(&statbuf.st_mtime, &mytm); - strftime(tmbuf, kTimeBufLen, "%a, %d %b %Y %H:%M:%S %Z", &mytm); - pretty_end_time = tmbuf; - - buf.AddPrintf( - "" - "\n", - filename.c_str(), filename.c_str(), - EscapeHtml(pretty_start_time).c_str(), - EscapeHtml(pretty_end_time).c_str()); - }); - buf.AddPrintf("
FilenameStartEnd
%s%s%s
\n\n"); - evhttp_send_reply(req, HTTP_OK, "OK", buf.get()); -} - -std::vector Stream::GetFilesForTesting() { - std::vector files; - manager_.ForEachFile( - [&files](const std::string &filename, const struct stat &statbuf) { - files.push_back(filename); - }); - return files; -} - -void Stream::HttpCallbackForFile(evhttp_request *req, const string &filename) { - struct stat s; - if (!manager_.Lookup(filename, &s)) { - return evhttp_send_error(req, HTTP_NOTFOUND, "File not found."); +void Stream::CloseOutput(int64_t pts) { + if (!writer_.is_open()) { + return; } - HttpServeFile(req, "video/mp4", camera_dir_.get(), filename, s); + std::string error_message; + if (prev_pkt_start_time_90k_ != -1) { + int64_t duration_90k = pts - prev_pkt_start_time_90k_; + index_.AddSample(duration_90k > 0 ? duration_90k : 0, prev_pkt_bytes_, + prev_pkt_key_); + } + if (!writer_.Close(&recording_.sample_file_sha1, &error_message)) { + LOG(ERROR) << row_.short_name << ": Closing output " + << recording_.sample_file_uuid.UnparseText() + << " failed with error: " << error_message; + uuids_to_unlink_.push_back(recording_.sample_file_uuid); + TryUnlink(); + return; + } + int ret = env_->sample_file_dir->Sync(); + if (ret != 0) { + LOG(ERROR) << row_.short_name + << ": Unable to sync sample file dir after writing " + << recording_.sample_file_uuid.UnparseText() << ": " + << strerror(ret); + uuids_to_unlink_.push_back(recording_.sample_file_uuid); + TryUnlink(); + return; + } + if (!env_->mdb->InsertRecording(&recording_, &error_message)) { + LOG(ERROR) << row_.short_name << ": Unable to insert recording " + << recording_.sample_file_uuid.UnparseText() << ": " + << error_message; + uuids_to_unlink_.push_back(recording_.sample_file_uuid); + TryUnlink(); + return; + } + row_.total_sample_file_bytes += recording_.sample_file_bytes; + VLOG(1) << row_.short_name << ": ...wrote " + << recording_.sample_file_uuid.UnparseText() << "; usage now " + << HumanizeWithBinaryPrefix(row_.total_sample_file_bytes, "B"); } -Nvr::Nvr() { - env_.clock = GetRealClock(); - env_.video_source = GetRealVideoSource(); - env_.fs = GetRealFilesystem(); +void Stream::TryUnlink() { + std::vector still_not_unlinked; + for (const auto &uuid : uuids_to_unlink_) { + std::string text = uuid.UnparseText(); + int ret = env_->sample_file_dir->Unlink(text.c_str()); + if (ret == ENOENT) { + LOG(WARNING) << row_.short_name << ": Sample file " << text + << " already deleted!"; + } else if (ret != 0) { + LOG(WARNING) << row_.short_name << ": Unable to unlink " << text << ": " + << strerror(ret); + still_not_unlinked.push_back(uuid); + continue; + } + uuids_to_mark_deleted_.push_back(uuid); + } + uuids_to_unlink_ = std::move(still_not_unlinked); +} + +bool Stream::OpenOutput(std::string *error_message) { + int64_t frame_localtime_90k = To90k(frame_realtime_); + if (start_localtime_90k_ == -1) { + start_localtime_90k_ = frame_localtime_90k - start_pts_; + } + if (!RotateFiles(error_message)) { + return false; + } + std::vector reserved = env_->mdb->ReserveSampleFiles(1, error_message); + if (reserved.size() != 1) { + return false; + } + CHECK(!writer_.is_open()); + string filename = reserved[0].UnparseText(); + recording_.id = -1; + recording_.camera_id = row_.id; + recording_.sample_file_uuid = reserved[0]; + recording_.video_sample_entry_id = entry_.id; + recording_.local_time_90k = frame_localtime_90k; + index_.Init(&recording_, start_localtime_90k_ + start_pts_); + if (!writer_.Open(filename.c_str(), error_message)) { + return false; + } + prev_pkt_start_time_90k_ = -1; + prev_pkt_bytes_ = -1; + prev_pkt_key_ = false; + LOG(INFO) << row_.short_name << ": Opened output " << filename + << ", using start_pts=" << start_pts_ + << ", input timebase=" << in_->stream()->time_base.num << "/" + << in_->stream()->time_base.den; + return true; +} + +bool Stream::RotateFiles(std::string *error_message) { + int64_t bytes_needed = row_.total_sample_file_bytes - row_.retain_bytes; + int64_t bytes_to_delete = 0; + if (bytes_needed <= 0) { + VLOG(1) << row_.short_name << ": have remaining quota of " + << HumanizeWithBinaryPrefix(-bytes_needed, "B"); + return true; + } + LOG(INFO) << row_.short_name << ": need to delete " + << HumanizeWithBinaryPrefix(bytes_needed, "B"); + std::vector to_delete; + auto row_cb = [&](const ListOldestSampleFilesRow &row) { + bytes_needed -= row.sample_file_bytes; + bytes_to_delete += row.sample_file_bytes; + to_delete.push_back(row); + return bytes_needed < 0 ? IterationControl::kBreak + : IterationControl::kContinue; + }; + if (!env_->mdb->ListOldestSampleFiles(row_.uuid, row_cb, error_message)) { + return false; + } + if (bytes_needed > 0) { + *error_message = + StrCat("couldn't find enough files to delete; ", + HumanizeWithBinaryPrefix(bytes_needed, "B"), " left."); + return false; + } + if (!env_->mdb->DeleteRecordings(to_delete, error_message)) { + return false; + } + for (const auto &to_delete_row : to_delete) { + uuids_to_unlink_.push_back(to_delete_row.sample_file_uuid); + } + row_.total_sample_file_bytes -= bytes_to_delete; + TryUnlink(); + if (!uuids_to_unlink_.empty()) { + *error_message = + StrCat("failed to unlink ", uuids_to_unlink_.size(), " files."); + return false; + } + int ret = env_->sample_file_dir->Sync(); + if (ret != 0) { + *error_message = StrCat("fsync sample directory: ", strerror(ret)); + return false; + } + if (!env_->mdb->MarkSampleFilesDeleted(uuids_to_mark_deleted_, + error_message)) { + *error_message = StrCat("unable to mark ", uuids_to_mark_deleted_.size(), + " sample files as deleted"); + return false; + } + uuids_to_mark_deleted_.clear(); + VLOG(1) << row_.short_name << ": ...deleted successfully; usage now " + << HumanizeWithBinaryPrefix(row_.total_sample_file_bytes, "B"); + return true; } Nvr::~Nvr() { @@ -489,79 +428,35 @@ Nvr::~Nvr() { for (auto &thread : stream_threads_) { thread.join(); } + // TODO: cleanup reservations? } -bool Nvr::Init(const moonfire_nvr::Config &config, std::string *error_msg) { - if (config.base_path().empty()) { - *error_msg = "base_path must be configured."; +bool Nvr::Init(std::string *error_msg) { + std::vector all_reserved; + if (!env_->mdb->ListReservedSampleFiles(&all_reserved, error_msg)) { return false; } - - for (const auto &camera : config.camera()) { - streams_.emplace_back(new Stream(&signal_, config, &env_, camera)); - if (!streams_.back()->Init(error_msg)) { - return false; + for (const auto &reserved : all_reserved) { + int ret = env_->sample_file_dir->Unlink(reserved.UnparseText().c_str()); + if (ret != 0 && ret != ENOENT) { + LOG(WARNING) << "Unable to remove reserved sample file: " + << reserved.UnparseText(); } } - for (auto &stream : streams_) { - stream_threads_.emplace_back([&stream]() { stream->Run(); }); - } + + std::vector cameras; + env_->mdb->ListCameras([&](const ListCamerasRow &row) { + cameras.push_back(row); + return IterationControl::kContinue; + }); + for (size_t i = 0; i < cameras.size(); ++i) { + int rotate_offset_sec = kRotateIntervalSec * i / cameras.size(); + auto *stream = new Stream(&signal_, env_, cameras[i], rotate_offset_sec, + kRotateIntervalSec); + streams_.emplace_back(stream); + stream_threads_.emplace_back([stream]() { stream->Run(); }); + }; return true; } -void Nvr::HttpCallback(evhttp_request *req) { - if (evhttp_request_get_command(req) != EVHTTP_REQ_GET) { - return evhttp_send_error(req, HTTP_BADMETHOD, "only GET allowed"); - } - - evhttp_uri *uri = evhttp_uri_parse(evhttp_request_get_uri(req)); - if (uri == nullptr || evhttp_uri_get_path(uri) == nullptr) { - return evhttp_send_error(req, HTTP_INTERNAL, "Failed to parse URI."); - } - - std::string uri_path = evhttp_uri_get_path(uri); - evhttp_uri_free(uri); - uri = nullptr; - - if (uri_path == "/") { - return HttpCallbackForTopLevel(req); - } else if (!re2::StringPiece(uri_path).starts_with("/c/")) { - return evhttp_send_error(req, HTTP_NOTFOUND, "Not found."); - } - size_t camera_name_start = strlen("/c/"); - size_t next_slash = uri_path.find('/', camera_name_start); - if (next_slash == std::string::npos) { - CHECK_EQ(0, evhttp_add_header(evhttp_request_get_output_headers(req), - "Location", StrCat(uri_path, "/").c_str())); - return evhttp_send_reply(req, HTTP_MOVEPERM, "OK", EvBuffer().get()); - } - re2::StringPiece camera_name = - uri_path.substr(camera_name_start, next_slash - camera_name_start); - for (const auto &stream : streams_) { - if (stream->camera_name() == camera_name) { - if (uri_path.size() == next_slash + 1) { - return stream->HttpCallbackForDirectory(req); - } else { - return stream->HttpCallbackForFile(req, - uri_path.substr(next_slash + 1)); - } - } - } - return evhttp_send_error(req, HTTP_NOTFOUND, "No such camera."); -} - -void Nvr::HttpCallbackForTopLevel(evhttp_request *req) { - EvBuffer buf; - buf.Add("
    \n"); - for (const auto &stream : streams_) { - // Camera name passed IsWord; there's no need to escape it. - const string &name = stream->camera_name(); - string escaped_description = EscapeHtml(stream->camera_description()); - buf.AddPrintf("
  • %s: %s
  • \n", name.c_str(), - name.c_str(), escaped_description.c_str()); - } - buf.Add("
\n"); - return evhttp_send_reply(req, HTTP_OK, "OK", buf.get()); -} - } // namespace moonfire_nvr diff --git a/src/moonfire-nvr.h b/src/moonfire-nvr.h index f79a792..7dbcca4 100644 --- a/src/moonfire-nvr.h +++ b/src/moonfire-nvr.h @@ -45,8 +45,8 @@ #include -#include "config.pb.h" #include "filesystem.h" +#include "moonfire-db.h" #include "ffmpeg.h" #include "time.h" @@ -69,124 +69,53 @@ class ShutdownSignal { std::atomic_bool shutdown_{false}; }; -// Environment for streams to use. This is supplied for testability. +// The Nvr's environment. This is supplied for testability. struct Environment { WallClock *clock = nullptr; VideoSource *video_source = nullptr; - Filesystem *fs = nullptr; -}; - -// Delete old ".mp4" files within a specified directory, keeping them within a -// byte limit. In particular, "old" means "lexographically smaller filename". -// Thread-safe. -// -// On startup, FileManager reads the directory and stats every matching file. -// Afterward, it assumes that (1) it is informed of every added file and (2) -// files are deleted only through calls to Rotate. -class FileManager { - public: - using FileCallback = std::function; - - // |short_name| will be prepended to log messages. - FileManager(const std::string &short_name, const std::string &path, - uint64_t byte_limit, Environment *env); - FileManager(const FileManager &) = delete; - FileManager &operator=(const FileManager &) = delete; - - // Initialize the FileManager by examining existing directory contents. - // Create the directory if necessary. - bool Init(std::string *error_message); - - // Delete files to go back within the byte limit if necessary. - bool Rotate(std::string *error_message); - - // Note that a file has been added. This may bring the FileManager over the - // byte limit; no files will be deleted immediately. - bool AddFile(const std::string &filename, std::string *error_message); - - // Call |fn| for each file, while holding the lock. - void ForEachFile(FileCallback) const; - - // Look up a file. - // If |filename| is known to the manager, returns true and fills |statbuf|. - // Otherwise returns false. - bool Lookup(const std::string &filename, struct stat *statbuf) const; - - int64_t total_bytes() const { - std::lock_guard lock(mu_); - return total_bytes_; - } - - private: - const std::string short_name_; - const std::string path_; - const uint64_t byte_limit_; - Environment *const env_; - - mutable std::mutex mu_; - std::map files_; - uint64_t total_bytes_ = 0; // total bytes of all |files_|. + File *sample_file_dir = nullptr; + MoonfireDatabase *mdb = nullptr; }; // A single video stream, currently always a camera's "main" (as opposed to // "sub") stream. Methods are thread-compatible rather than thread-safe; the -// Nvr should call Init + Run in a dedicated thread. +// Nvr should call Run in a dedicated thread. class Stream { public: - Stream(const ShutdownSignal *signal, const moonfire_nvr::Config &config, - Environment *const env, const moonfire_nvr::Camera &camera) + Stream(const ShutdownSignal *signal, Environment *const env, + const moonfire_nvr::ListCamerasRow &row, int rotate_offset_sec, + int rotate_interval_sec) : signal_(signal), env_(env), - camera_path_(config.base_path() + "/" + camera.short_name()), - rotate_interval_(config.rotate_sec()), - camera_(camera), - manager_(camera_.short_name(), camera_path_, camera.retain_bytes(), - env) {} + row_(row), + rotate_offset_sec_(rotate_offset_sec), + rotate_interval_sec_(rotate_interval_sec), + writer_(env->sample_file_dir) {} Stream(const Stream &) = delete; Stream &operator=(const Stream &) = delete; - // Call once on startup, before Run(). - bool Init(std::string *error_message); - - const std::string &camera_name() const { return camera_.short_name(); } - const std::string &camera_description() const { - return camera_.description(); - } - // Call from dedicated thread. Runs until shutdown requested. void Run(); - // Handle HTTP requests which have been pre-determined to be for the - // directory view of this stream or a particular file, respectively. - // Thread-safe. - void HttpCallbackForDirectory(evhttp_request *req); - void HttpCallbackForFile(evhttp_request *req, const std::string &filename); - - std::vector GetFilesForTesting(); - private: enum ProcessPacketsResult { kInputError, kOutputError, kStopped }; - const std::string &short_name() const { return camera_.short_name(); } - ProcessPacketsResult ProcessPackets(std::string *error_message); bool OpenInput(std::string *error_message); - void CloseOutput(); - std::string MakeOutputFilename(); + + // |pts| should be the relative pts within this output segment if closing + // due to normal rotation, or -1 if closing abruptly. + void CloseOutput(int64_t pts); + bool OpenOutput(std::string *error_message); - bool RotateFiles(); - bool Stat(const std::string &filename, struct stat *file, - std::string *error_message); + bool RotateFiles(std::string *error_message); + void TryUnlink(); const ShutdownSignal *signal_; const Environment *env_; - const std::string camera_path_; - const int32_t rotate_interval_; - const moonfire_nvr::Camera camera_; - - FileManager manager_; // thread-safe. - std::unique_ptr camera_dir_; // thread-safe. + ListCamerasRow row_; + const int rotate_offset_sec_; + const int rotate_interval_sec_; // // State below is used only by the thread in Run(). @@ -196,20 +125,41 @@ class Stream { int64_t min_next_pts_ = std::numeric_limits::min(); bool seen_key_frame_ = false; + // need_transform_ indicates if TransformSampleData will need to be called + // on each video sample. + bool need_transform_ = false; + + VideoSampleEntry entry_; + std::string transform_tmp_; + std::vector uuids_to_unlink_; + std::vector uuids_to_mark_deleted_; + // Current output segment. - moonfire_nvr::OutputVideoPacketStream out_; + Recording recording_; + moonfire_nvr::SampleFileWriter writer_; + SampleIndexEncoder index_; time_t rotate_time_ = 0; // rotate when frame_realtime_ >= rotate_time_. - std::string out_file_; // current output filename. + + // start_pts_ is the pts of the first frame included in the current output. int64_t start_pts_ = -1; - // Packet-to-packet state. + // start_localtime_90k_ is the local system's time since epoch (in 90k units) + // to match start_pts_. + int64_t start_localtime_90k_ = -1; + + // These fields describe a packet which has been written to the + // sample file but (because the duration is not yet known) has not been + // added to the index. + int32_t prev_pkt_start_time_90k_ = -1; + int32_t prev_pkt_bytes_ = -1; + bool prev_pkt_key_ = false; struct timespec frame_realtime_ = {0, 0}; }; // The main network video recorder, which manages a collection of streams. class Nvr { public: - Nvr(); + explicit Nvr(Environment *env) : env_(env) {} Nvr(const Nvr &) = delete; Nvr &operator=(const Nvr &) = delete; @@ -221,16 +171,12 @@ class Nvr { // Initialize the NVR. Call before any other operation. // Verifies configuration and starts background threads to capture/rotate // streams. - bool Init(const moonfire_nvr::Config &config, std::string *error_msg); - - // Handle an HTTP request. - void HttpCallback(evhttp_request *req); + bool Init(std::string *error_msg); private: void HttpCallbackForTopLevel(evhttp_request *req); - Environment env_; - moonfire_nvr::Config config_; + Environment *const env_; std::vector> streams_; std::vector stream_threads_; ShutdownSignal signal_; diff --git a/src/recording.h b/src/recording.h index c15a3c8..620fe27 100644 --- a/src/recording.h +++ b/src/recording.h @@ -65,8 +65,10 @@ struct Recording { int64_t id = -1; int64_t camera_id = -1; std::string sample_file_sha1; + std::string sample_file_path; Uuid sample_file_uuid; int64_t video_sample_entry_id = -1; + int64_t local_time_90k = -1; // Fields populated by SampleIndexEncoder. int64_t start_time_90k = -1; @@ -206,6 +208,11 @@ struct VideoSampleEntry { std::string PrettyTimestamp(int64_t ts_90k); +inline int64_t To90k(const struct timespec &ts) { + return (ts.tv_sec * kTimeUnitsPerSecond) + + (ts.tv_nsec * kTimeUnitsPerSecond / 1000000000); +} + } // namespace moonfire_nvr #endif // MOONFIRE_NVR_RECORDING_H diff --git a/src/schema.sql b/src/schema.sql index 241cf3f..93b660c 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -75,13 +75,22 @@ create table recording ( sample_file_bytes integer not null check (sample_file_bytes > 0), -- The starting time of the recording, in 90 kHz units since - -- 1970-01-01 00:00:00 UTC. + -- 1970-01-01 00:00:00 UTC. Currently on initial connection, this is taken + -- from the local system time; on subsequent recordings, it exactly + -- matches the previous recording's end time. start_time_90k integer not null check (start_time_90k > 0), -- The duration of the recording, in 90 kHz units. duration_90k integer not null check (duration_90k >= 0 and duration_90k < 5*60*90000), + -- The number of 90 kHz units the local system time is ahead of the + -- recording; negative numbers indicate the local system time is behind + -- the recording. Large values would indicate that the local time has jumped + -- during recording or that the local time and camera time frequencies do + -- not match. + local_time_delta_90k integer not null, + video_samples integer not null check (video_samples > 0), video_sync_samples integer not null check (video_samples > 0), video_sample_entry_id integer references video_sample_entry (id), diff --git a/src/string-test.cc b/src/string-test.cc index a781ed9..bf62ecb 100644 --- a/src/string-test.cc +++ b/src/string-test.cc @@ -68,24 +68,6 @@ TEST(JoinTest, Simple) { Join(std::initializer_list({"a", "b", "c"}), ",")); } -TEST(IsWordTest, Simple) { - EXPECT_TRUE(IsWord("")); - EXPECT_TRUE(IsWord("0123456789")); - EXPECT_TRUE(IsWord("abcdefghijklmnopqrstuvwxyz")); - EXPECT_TRUE(IsWord("ABCDEFGHIJKLMNOPQRSTUVWXYZ")); - EXPECT_TRUE(IsWord("_")); - - EXPECT_TRUE(IsWord("4bJ_")); - - EXPECT_FALSE(IsWord("/")); - EXPECT_FALSE(IsWord("abc/")); - EXPECT_FALSE(IsWord(" ")); - EXPECT_FALSE(IsWord("@")); - EXPECT_FALSE(IsWord("[")); - EXPECT_FALSE(IsWord("`")); - EXPECT_FALSE(IsWord("{")); -} - TEST(EscapeTest, Simple) { EXPECT_EQ("", moonfire_nvr::EscapeHtml("")); EXPECT_EQ("no special chars", moonfire_nvr::EscapeHtml("no special chars")); diff --git a/src/string.cc b/src/string.cc index c8a2d1a..506f061 100644 --- a/src/string.cc +++ b/src/string.cc @@ -90,16 +90,6 @@ StrCatPiece::StrCatPiece(int64_t p) { } // namespace internal -bool IsWord(const std::string &str) { - for (char c : str) { - if (!(('0' <= c && c <= '9') || ('A' <= c && c <= 'Z') || - ('a' <= c && c <= 'z') || c == '_')) { - return false; - } - } - return true; -} - std::string EscapeHtml(const std::string &input) { std::string output; output.reserve(input.size()); diff --git a/src/string.h b/src/string.h index 8002f1c..b5b2b47 100644 --- a/src/string.h +++ b/src/string.h @@ -112,9 +112,6 @@ std::string Join(const Container &pieces, re2::StringPiece separator) { return out; } -// Return true if every character in |str| is in [A-Za-z0-9_]. -bool IsWord(const std::string &str); - // HTML-escape the given UTF-8-encoded string. std::string EscapeHtml(const std::string &input); diff --git a/src/uuid.h b/src/uuid.h index 4c3f255..de6865a 100644 --- a/src/uuid.h +++ b/src/uuid.h @@ -34,6 +34,7 @@ #ifndef MOONFIRE_NVR_UUID_H #define MOONFIRE_NVR_UUID_H +#include #include #include @@ -74,6 +75,11 @@ class UuidGenerator { virtual Uuid Generate() = 0; }; +class MockUuidGenerator : public UuidGenerator { + public: + MOCK_METHOD0(Generate, Uuid()); +}; + UuidGenerator *GetRealUuidGenerator(); } // namespace moonfire_nvr diff --git a/src/web.cc b/src/web.cc index 99472bc..3ffa765 100644 --- a/src/web.cc +++ b/src/web.cc @@ -53,6 +53,7 @@ void WebInterface::HandleCameraList(evhttp_request *req, void *arg) { "\n" "\n" "Camera list\n" + "\n" "