mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-01-26 14:13:17 -05:00
Construct HTTP responses incrementally.
This isn't as much of a speed-up as you might imagine; most of the large HTTP content was mmap()ed files which are relatively efficient. The big improvement here is that it's now possible to serve large files (4 GiB and up) on 32-bit machines. This actually works: I was just able to browse a 25-hour, 37 GiB .mp4 file on my Raspberry Pi 2 Model B. It takes about 400 ms to start serving each request, which isn't exactly zippy but might be forgivable for such a large file. I still intend for the common request from the web interface to be for much smaller fragmented .mp4 files. Speed could be improved later through caching. Right now my test code is creating a fresh VirtualFile from a database query on each request, even though it hasn't changed. The tricky part will be doing cache invalidation cleanly if it does change---new recordings are added to the requested time range, recordings are deleted, or existing recordings' timestamps are changed. The downside to the approach here is that it requires libevent 2.1 for evhttp_send_reply_chunk_with_cb. Unfortunately, Ubuntu 15.10 and Debian Jessie still bundle libevent 2.0. There are a few possible improvements here: 1. fall back to assuming chunks are added immediately, so that people with libevent 2.0 get the old bad behavior and people with libevent 2.1 get the better behavior. This is kind of lame, though; it's easy to go through the whole address space pretty fast, particularly when the browsers send out requests so quickly so there may be some unintentional concurrency. 2. alter the FileSlice interface to return a pointer/destructor rather than add something to the evbuffer. HttpServe would then add each chunk via evbuffer_add_reference, and it'd supply a cleanupfn that (in addition to calling the FileSlice-supplied destructor) notes that this chunk has been fully sent. For all the currently-used FileSlices, this shouldn't be too hard, and there are a few other reasons it might be beneficial: * RealFileSlice could call madvise() to control the OS buffering * RealFileSlice could track when file descriptors are open and thus FileManager's unlink() calls don't actually free up space * It feels dirty to expose libevent stuff through the otherwise-nice FileSlice interface. 3. support building libevent 2.1 statically in-tree if the OS-supplied libevent is unsuitable. I'm tempted to go with #2, but probably not right now. More urgent to commit support for writing the new format and the wrapper bits for viewing it.
This commit is contained in:
parent
6cd2d75846
commit
4c7eed293f
@ -59,7 +59,7 @@ find_library(PROFILER_LIBRARIES profiler)
|
||||
# https://cmake.org/cmake/help/v3.0/module/FindPkgConfig.html
|
||||
find_package(PkgConfig)
|
||||
pkg_check_modules(FFMPEG REQUIRED libavutil libavcodec libavformat)
|
||||
pkg_check_modules(LIBEVENT REQUIRED libevent)
|
||||
pkg_check_modules(LIBEVENT REQUIRED libevent>=2.1)
|
||||
pkg_check_modules(GLOG REQUIRED libglog)
|
||||
pkg_check_modules(OPENSSL REQUIRED libcrypto)
|
||||
pkg_check_modules(SQLITE REQUIRED sqlite3)
|
||||
|
18
README.md
18
README.md
@ -64,9 +64,11 @@ from source. It requires several packages to build:
|
||||
along with all versions of the competing project [libav](http://libav.org),
|
||||
does not support socket timeouts for RTSP. For reliable reconnections on
|
||||
error, it's strongly recommended to use ffmpeg >= 55.1.101.
|
||||
* [libevent](http://libevent.org/) 2.x, for the built-in HTTP server.
|
||||
* [libevent](http://libevent.org/) 2.1, for the built-in HTTP server.
|
||||
(This might be replaced with the more full-featured
|
||||
[nghttp2](https://github.com/tatsuhiro-t/nghttp2) in the future.)
|
||||
Unfortunately, the libevent 2.0 bundled with current Debian releases is
|
||||
unsuitable.
|
||||
* [protocol buffers](https://developers.google.com/protocol-buffers/),
|
||||
currently just for the configuration file.
|
||||
* [gflags](http://gflags.github.io/gflags/), for commandline flag parsing.
|
||||
@ -79,7 +81,7 @@ from source. It requires several packages to build:
|
||||
* libuuid from (util-linux)[https://en.wikipedia.org/wiki/Util-linux].
|
||||
* [SQLite3](https://www.sqlite.org/).
|
||||
|
||||
On Ubuntu 15.10 or Raspbian Jessie, the following command will install all
|
||||
On Ubuntu 15.10 or Raspbian Jessie, the following command will install most
|
||||
pre-requisites (see also the `Build-Depends` field in `debian/control`):
|
||||
|
||||
$ sudo apt-get install \
|
||||
@ -89,15 +91,18 @@ pre-requisites (see also the `Build-Depends` field in `debian/control`):
|
||||
libavcodec-dev \
|
||||
libavformat-dev \
|
||||
libavutil-dev \
|
||||
libevent-dev \
|
||||
libgflags-dev \
|
||||
libgoogle-glog-dev \
|
||||
libgoogle-perftools-dev \
|
||||
libre2-dev \
|
||||
libsqlite3-dev \
|
||||
libuuid-dev \
|
||||
pkgconf \
|
||||
protobuf-compiler
|
||||
protobuf-compiler \
|
||||
uuid-dev
|
||||
|
||||
libevent 2.1 will have to be installed from source. In the future, this
|
||||
dependency may be replaced or support may be added for automatically building
|
||||
libevent in-tree to avoid the inconvenience.
|
||||
|
||||
Once prerequisites are installed, Moonfire NVR can be built as follows:
|
||||
|
||||
@ -107,7 +112,8 @@ Once prerequisites are installed, Moonfire NVR can be built as follows:
|
||||
$ make
|
||||
$ sudo make install
|
||||
|
||||
Alternatively, you can prepare a `.deb` package:
|
||||
Alternatively, if you do have a sufficiently new apt-installed libevent
|
||||
installed, you may be able to prepare a `.deb` package:
|
||||
|
||||
$ sudo apt-get install devscripts dh-systemd
|
||||
$ debuild -us -uc
|
||||
|
2
debian/control
vendored
2
debian/control
vendored
@ -3,7 +3,7 @@ Maintainer: Scott Lamb <slamb@slamb.org>
|
||||
Section: video
|
||||
Priority: optional
|
||||
Standards-Version: 3.9.6.1
|
||||
Build-Depends: debhelper (>= 9), dh-systemd, cmake, libprotobuf-dev, libavcodec-dev, libavformat-dev, libevent-dev, libgflags-dev, libgoogle-glog-dev, libgoogle-perftools-dev, libre2-dev, pkgconf, protobuf-compiler, libuuid-dev, libsqlite3-dev
|
||||
Build-Depends: debhelper (>= 9), dh-systemd, cmake, libprotobuf-dev, libavcodec-dev, libavformat-dev, libevent-dev (>= 2.1), libgflags-dev, libgoogle-glog-dev, libgoogle-perftools-dev, libre2-dev, pkgconf, protobuf-compiler, uuid-dev, libsqlite3-dev
|
||||
|
||||
Package: moonfire-nvr
|
||||
Architecture: any
|
||||
|
@ -61,7 +61,7 @@ namespace {
|
||||
class MockFileSlice : public FileSlice {
|
||||
public:
|
||||
MOCK_CONST_METHOD0(size, int64_t());
|
||||
MOCK_CONST_METHOD3(AddRange, bool(ByteRange, EvBuffer *, std::string *));
|
||||
MOCK_CONST_METHOD3(AddRange, int64_t(ByteRange, EvBuffer *, std::string *));
|
||||
};
|
||||
|
||||
TEST(EvBufferTest, AddFileTest) {
|
||||
@ -99,18 +99,18 @@ TEST(EvBufferTest, AddFileTest) {
|
||||
|
||||
class FileSlicesTest : public testing::Test {
|
||||
protected:
|
||||
FileSlicesTest() {
|
||||
void Init(int flags) {
|
||||
EXPECT_CALL(a_, size()).Times(AnyNumber()).WillRepeatedly(Return(5));
|
||||
EXPECT_CALL(b_, size()).Times(AnyNumber()).WillRepeatedly(Return(13));
|
||||
EXPECT_CALL(c_, size()).Times(AnyNumber()).WillRepeatedly(Return(7));
|
||||
EXPECT_CALL(d_, size()).Times(AnyNumber()).WillRepeatedly(Return(17));
|
||||
EXPECT_CALL(e_, size()).Times(AnyNumber()).WillRepeatedly(Return(19));
|
||||
|
||||
slices_.Append(&a_);
|
||||
slices_.Append(&b_);
|
||||
slices_.Append(&c_);
|
||||
slices_.Append(&d_);
|
||||
slices_.Append(&e_);
|
||||
slices_.Append(&a_, flags);
|
||||
slices_.Append(&b_, flags);
|
||||
slices_.Append(&c_, flags);
|
||||
slices_.Append(&d_, flags);
|
||||
slices_.Append(&e_, flags);
|
||||
}
|
||||
|
||||
FileSlices slices_;
|
||||
@ -122,44 +122,79 @@ class FileSlicesTest : public testing::Test {
|
||||
};
|
||||
|
||||
TEST_F(FileSlicesTest, Size) {
|
||||
Init(0);
|
||||
EXPECT_EQ(5 + 13 + 7 + 17 + 19, slices_.size());
|
||||
}
|
||||
|
||||
TEST_F(FileSlicesTest, ExactSlice) {
|
||||
// Exactly slice b.
|
||||
Init(0);
|
||||
std::string error_message;
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(true));
|
||||
EXPECT_TRUE(slices_.AddRange(ByteRange(5, 18), nullptr, &error_message))
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(13));
|
||||
EXPECT_EQ(13, slices_.AddRange(ByteRange(5, 18), nullptr, &error_message))
|
||||
<< error_message;
|
||||
}
|
||||
|
||||
TEST_F(FileSlicesTest, Offset) {
|
||||
// Part of slice b, all of slice c, and part of slice d.
|
||||
Init(0);
|
||||
std::string error_message;
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(12, 13), _, _)).WillOnce(Return(true));
|
||||
EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(true));
|
||||
EXPECT_CALL(d_, AddRange(ByteRange(0, 1), _, _)).WillOnce(Return(true));
|
||||
EXPECT_TRUE(slices_.AddRange(ByteRange(17, 26), nullptr, &error_message))
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(12, 13), _, _)).WillOnce(Return(1));
|
||||
EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(7));
|
||||
EXPECT_CALL(d_, AddRange(ByteRange(0, 1), _, _)).WillOnce(Return(1));
|
||||
EXPECT_EQ(9, slices_.AddRange(ByteRange(17, 26), nullptr, &error_message))
|
||||
<< error_message;
|
||||
}
|
||||
|
||||
TEST_F(FileSlicesTest, Everything) {
|
||||
Init(0);
|
||||
std::string error_message;
|
||||
EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(true));
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(true));
|
||||
EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(true));
|
||||
EXPECT_CALL(d_, AddRange(ByteRange(0, 17), _, _)).WillOnce(Return(true));
|
||||
EXPECT_CALL(e_, AddRange(ByteRange(0, 19), _, _)).WillOnce(Return(true));
|
||||
EXPECT_TRUE(slices_.AddRange(ByteRange(0, 61), nullptr, &error_message))
|
||||
EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5));
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(13));
|
||||
EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(7));
|
||||
EXPECT_CALL(d_, AddRange(ByteRange(0, 17), _, _)).WillOnce(Return(17));
|
||||
EXPECT_CALL(e_, AddRange(ByteRange(0, 19), _, _)).WillOnce(Return(19));
|
||||
EXPECT_EQ(61, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message))
|
||||
<< error_message;
|
||||
}
|
||||
|
||||
TEST_F(FileSlicesTest, Lazy) {
|
||||
Init(FileSlices::kLazy);
|
||||
std::string error_message;
|
||||
EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5));
|
||||
EXPECT_EQ(5, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message))
|
||||
<< error_message;
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(13));
|
||||
EXPECT_EQ(13, slices_.AddRange(ByteRange(5, 61), nullptr, &error_message))
|
||||
<< error_message;
|
||||
EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(7));
|
||||
EXPECT_EQ(7, slices_.AddRange(ByteRange(18, 61), nullptr, &error_message))
|
||||
<< error_message;
|
||||
EXPECT_CALL(d_, AddRange(ByteRange(0, 17), _, _)).WillOnce(Return(17));
|
||||
EXPECT_EQ(17, slices_.AddRange(ByteRange(25, 61), nullptr, &error_message))
|
||||
<< error_message;
|
||||
EXPECT_CALL(e_, AddRange(ByteRange(0, 19), _, _)).WillOnce(Return(19));
|
||||
EXPECT_EQ(19, slices_.AddRange(ByteRange(42, 61), nullptr, &error_message))
|
||||
<< error_message;
|
||||
}
|
||||
|
||||
TEST_F(FileSlicesTest, SliceWithPartialReturn) {
|
||||
Init(0);
|
||||
std::string error_message;
|
||||
EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5));
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(1));
|
||||
EXPECT_EQ(6, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message))
|
||||
<< error_message;
|
||||
}
|
||||
|
||||
TEST_F(FileSlicesTest, PropagateError) {
|
||||
Init(0);
|
||||
std::string error_message;
|
||||
EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(true));
|
||||
EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5));
|
||||
EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _))
|
||||
.WillOnce(DoAll(SetArgPointee<2>("asdf"), Return(false)));
|
||||
EXPECT_FALSE(slices_.AddRange(ByteRange(0, 61), nullptr, &error_message));
|
||||
.WillRepeatedly(DoAll(SetArgPointee<2>("asdf"), Return(-1)));
|
||||
EXPECT_EQ(5, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message));
|
||||
EXPECT_EQ(-1, slices_.AddRange(ByteRange(5, 61), nullptr, &error_message));
|
||||
EXPECT_EQ("asdf", error_message);
|
||||
}
|
||||
|
||||
@ -241,6 +276,13 @@ TEST(RangeHeaderTest, AbsentOrInvalid) {
|
||||
ParseRangeHeader("bytes=-", 10000, &ranges));
|
||||
}
|
||||
|
||||
// TODO: test HttpServe itself!
|
||||
// Currently the testing is manual. Three important cases:
|
||||
// * HTTP request succeeds
|
||||
// * client aborts (as in hitting ctrl-C in curl during a long request)
|
||||
// * the VirtualFile returns error (say, by chmod u-r on the file backing
|
||||
// a RealFileSlice)
|
||||
|
||||
} // namespace
|
||||
} // namespace moonfire_nvr
|
||||
|
||||
|
189
src/http.cc
189
src/http.cc
@ -40,7 +40,7 @@
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <memory>
|
||||
#include <algorithm>
|
||||
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/event.h>
|
||||
@ -74,8 +74,8 @@ class RealFile : public VirtualFile {
|
||||
}
|
||||
|
||||
// Add the given range of the file to the buffer.
|
||||
bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final {
|
||||
int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final {
|
||||
return slice_.AddRange(range, buf, error_message);
|
||||
}
|
||||
|
||||
@ -85,6 +85,62 @@ class RealFile : public VirtualFile {
|
||||
const struct stat stat_;
|
||||
};
|
||||
|
||||
// An HttpServe call still in progress.
|
||||
struct ServeInProgress {
|
||||
ByteRange left;
|
||||
int64_t sent_bytes = 0;
|
||||
std::shared_ptr<VirtualFile> file;
|
||||
evhttp_request *req = nullptr;
|
||||
};
|
||||
|
||||
void ServeCloseCallback(evhttp_connection *con, void *arg) {
|
||||
std::unique_ptr<ServeInProgress> serve(
|
||||
reinterpret_cast<ServeInProgress *>(arg));
|
||||
LOG(INFO) << serve->req << ": received client abort after sending "
|
||||
<< serve->sent_bytes << " bytes; there were " << serve->left.size()
|
||||
<< " bytes left.";
|
||||
|
||||
// The call to cancel will guarantee ServeChunkCallback is not called again.
|
||||
evhttp_cancel_request(serve->req);
|
||||
}
|
||||
|
||||
void ServeChunkCallback(evhttp_connection *con, void *arg) {
|
||||
std::unique_ptr<ServeInProgress> serve(
|
||||
reinterpret_cast<ServeInProgress *>(arg));
|
||||
|
||||
if (serve->left.size() == 0) {
|
||||
LOG(INFO) << serve->req << ": done; sent " << serve->sent_bytes
|
||||
<< " bytes.";
|
||||
evhttp_connection_set_closecb(con, nullptr, nullptr);
|
||||
evhttp_send_reply_end(serve->req);
|
||||
return;
|
||||
}
|
||||
|
||||
// Serve more data.
|
||||
EvBuffer buf;
|
||||
std::string error_message;
|
||||
int64_t added = serve->file->AddRange(serve->left, &buf, &error_message);
|
||||
if (added < 0) {
|
||||
// Order is important here: evhttp_cancel_request immediately calls the
|
||||
// close callback, so remove it first to avoid double-freeing |serve|.
|
||||
evhttp_connection_set_closecb(con, nullptr, nullptr);
|
||||
evhttp_cancel_request(serve->req);
|
||||
LOG(ERROR) << serve->req << ": Failed to serve request after sending "
|
||||
<< serve->sent_bytes << " bytes (" << serve->left.size()
|
||||
<< " bytes left): " << error_message;
|
||||
return;
|
||||
}
|
||||
|
||||
serve->sent_bytes += added;
|
||||
serve->left.begin += added;
|
||||
VLOG(1) << serve->req << ": sending " << added << " bytes (more) data; still "
|
||||
<< serve->left.size() << " bytes left";
|
||||
evhttp_send_reply_chunk_with_cb(serve->req, buf.get(), &ServeChunkCallback,
|
||||
serve.get());
|
||||
evhttp_send_reply_chunk(serve->req, buf.get());
|
||||
serve.release();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace internal {
|
||||
@ -196,76 +252,97 @@ void RealFileSlice::Init(re2::StringPiece filename, ByteRange range) {
|
||||
range_ = range;
|
||||
}
|
||||
|
||||
bool RealFileSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
int64_t RealFileSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
int fd = open(filename_.c_str(), O_RDONLY);
|
||||
if (fd < 0) {
|
||||
int err = errno;
|
||||
*error_message = StrCat("open: ", strerror(err));
|
||||
return false;
|
||||
*error_message = StrCat("open ", filename_, ": ", strerror(err));
|
||||
return -1;
|
||||
}
|
||||
if (!buf->AddFile(fd, range_.begin + range.begin, range.size(),
|
||||
error_message)) {
|
||||
close(fd);
|
||||
return false;
|
||||
return -1;
|
||||
}
|
||||
// |buf| now owns |fd|.
|
||||
return true;
|
||||
return range.size();
|
||||
}
|
||||
|
||||
bool FillerFileSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
int64_t FillerFileSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
std::unique_ptr<std::string> s(new std::string);
|
||||
s->reserve(size_);
|
||||
if (!fn_(s.get(), error_message)) {
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
if (s->size() != size_) {
|
||||
*error_message = StrCat("Expected filled slice to be ", size_,
|
||||
" bytes; got ", s->size(), " bytes.");
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
std::string *unowned_s = s.release();
|
||||
buf->AddReference(unowned_s->data() + range.begin,
|
||||
range.size(), [](const void *, size_t, void *s) {
|
||||
delete reinterpret_cast<std::string *>(s);
|
||||
}, unowned_s);
|
||||
return true;
|
||||
return range.size();
|
||||
}
|
||||
|
||||
bool StaticStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
int64_t StaticStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
buf->AddReference(piece_.data() + range.begin, range.size(), nullptr,
|
||||
nullptr);
|
||||
return true;
|
||||
return range.size();
|
||||
}
|
||||
|
||||
bool CopyingStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
int64_t CopyingStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
buf->Add(re2::StringPiece(piece_.data() + range.begin, range.size()));
|
||||
return true;
|
||||
return range.size();
|
||||
}
|
||||
|
||||
bool FileSlices::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
int64_t FileSlices::AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const {
|
||||
if (range.begin < 0 || range.begin > range.end || range.end > size_) {
|
||||
*error_message = StrCat("Range ", range.DebugString(),
|
||||
" not valid for file of size ", size_);
|
||||
return false;
|
||||
}
|
||||
int64_t total_bytes_added = 0;
|
||||
auto it = std::upper_bound(slices_.begin(), slices_.end(), range.begin,
|
||||
[](int64_t begin, const SliceInfo &info) {
|
||||
return begin < info.range.end;
|
||||
});
|
||||
for (; it != slices_.end() && range.end > it->range.begin; ++it) {
|
||||
if (total_bytes_added > 0 && (it->flags & kLazy) != 0) {
|
||||
VLOG(1) << "early return of " << total_bytes_added << "/" << range.size()
|
||||
<< " bytes from FileSlices " << this << " because slice "
|
||||
<< it->slice << " is lazy.";
|
||||
break;
|
||||
}
|
||||
ByteRange mapped(
|
||||
std::max(INT64_C(0), range.begin - it->range.begin),
|
||||
std::min(range.end - it->range.begin, it->range.end - it->range.begin));
|
||||
if (!it->slice->AddRange(mapped, buf, error_message)) {
|
||||
return false;
|
||||
int64_t slice_bytes_added = it->slice->AddRange(mapped, buf, error_message);
|
||||
total_bytes_added += slice_bytes_added > 0 ? slice_bytes_added : 0;
|
||||
if (slice_bytes_added < 0 && total_bytes_added == 0) {
|
||||
LOG(WARNING) << "early return of " << total_bytes_added << "/"
|
||||
<< range.size() << " bytes from FileSlices " << this
|
||||
<< " due to slice " << it->slice
|
||||
<< " returning error: " << *error_message;
|
||||
return -1;
|
||||
} else if (slice_bytes_added < mapped.size()) {
|
||||
LOG(INFO) << "early return of " << total_bytes_added << "/"
|
||||
<< range.size() << " bytes from FileSlices " << this
|
||||
<< " due to slice " << it->slice << " returning "
|
||||
<< slice_bytes_added << "/" << mapped.size()
|
||||
<< " bytes. error_message (maybe populated): "
|
||||
<< *error_message;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return total_bytes_added;
|
||||
}
|
||||
|
||||
void HttpSendError(evhttp_request *req, int http_err, const std::string &prefix,
|
||||
@ -274,7 +351,7 @@ void HttpSendError(evhttp_request *req, int http_err, const std::string &prefix,
|
||||
EscapeHtml(prefix + strerror(posix_err)).c_str());
|
||||
}
|
||||
|
||||
void HttpServe(const VirtualFile &file, evhttp_request *req) {
|
||||
void HttpServe(const std::shared_ptr<VirtualFile> &file, evhttp_request *req) {
|
||||
// We could support HEAD, but there's probably no need.
|
||||
if (evhttp_request_get_command(req) != EVHTTP_REQ_GET) {
|
||||
return evhttp_send_error(req, HTTP_BADMETHOD, "only GET allowed");
|
||||
@ -284,7 +361,7 @@ void HttpServe(const VirtualFile &file, evhttp_request *req) {
|
||||
struct evkeyvalq *out_hdrs = evhttp_request_get_output_headers(req);
|
||||
|
||||
// Construct a Last-Modified: header.
|
||||
time_t last_modified = file.last_modified();
|
||||
time_t last_modified = file->last_modified();
|
||||
struct tm last_modified_tm;
|
||||
if (gmtime_r(&last_modified, &last_modified_tm) == 0) {
|
||||
return HttpSendError(req, HTTP_INTERNAL, "gmtime_r failed: ", errno);
|
||||
@ -294,73 +371,83 @@ void HttpServe(const VirtualFile &file, evhttp_request *req) {
|
||||
"%a, %d %b %Y %H:%M:%S GMT", &last_modified_tm) == 0) {
|
||||
return HttpSendError(req, HTTP_INTERNAL, "strftime failed: ", errno);
|
||||
}
|
||||
std::string etag = file.etag();
|
||||
std::string etag = file->etag();
|
||||
|
||||
// Ignore the "Range:" header if "If-Range:" specifies an incorrect etag.
|
||||
const char *if_range = evhttp_find_header(in_hdrs, "If-Range");
|
||||
const char *range_hdr = evhttp_find_header(in_hdrs, "Range");
|
||||
if (if_range != nullptr && etag != if_range) {
|
||||
LOG(INFO) << "Ignoring Range: because If-Range: is stale.";
|
||||
LOG(INFO) << req << ": Ignoring Range: because If-Range: is stale.";
|
||||
range_hdr = nullptr;
|
||||
}
|
||||
|
||||
EvBuffer buf;
|
||||
std::vector<ByteRange> ranges;
|
||||
auto range_type = internal::ParseRangeHeader(range_hdr, file.size(), &ranges);
|
||||
auto range_type =
|
||||
internal::ParseRangeHeader(range_hdr, file->size(), &ranges);
|
||||
std::string error_message;
|
||||
int http_status;
|
||||
const char *http_status_str;
|
||||
ByteRange left;
|
||||
switch (range_type) {
|
||||
case internal::RangeHeaderType::kNotSatisfiable: {
|
||||
std::string range_hdr = StrCat("bytes */", file.size());
|
||||
std::string range_hdr = StrCat("bytes */", file->size());
|
||||
evhttp_add_header(out_hdrs, "Content-Range", range_hdr.c_str());
|
||||
http_status = 416;
|
||||
http_status_str = "Range Not Satisfiable";
|
||||
LOG(INFO) << "Replying to non-satisfiable range request: " << range_hdr;
|
||||
LOG(INFO) << req
|
||||
<< ": Replying to non-satisfiable range request: " << range_hdr;
|
||||
break;
|
||||
}
|
||||
|
||||
case internal::RangeHeaderType::kSatisfiable:
|
||||
// We only support the simpler single-range case for now.
|
||||
// A multi-range request just serves the whole file via the fallthrough.
|
||||
if (ranges.size() == 1) {
|
||||
std::string range_hdr = StrCat("bytes ", ranges[0].begin, "-",
|
||||
ranges[0].end - 1, "/", file.size());
|
||||
if (!file.AddRange(ranges[0], &buf, &error_message)) {
|
||||
LOG(ERROR) << "Unable to serve range " << ranges[0] << ": "
|
||||
<< error_message;
|
||||
return evhttp_send_error(req, HTTP_INTERNAL,
|
||||
EscapeHtml(error_message).c_str());
|
||||
}
|
||||
ranges[0].end - 1, "/", file->size());
|
||||
left = ranges[0];
|
||||
evhttp_add_header(out_hdrs, "Content-Range", range_hdr.c_str());
|
||||
http_status = 206;
|
||||
http_status_str = "Partial Content";
|
||||
LOG(INFO) << "Replying to range request";
|
||||
LOG(INFO) << req << ": URI " << evhttp_request_get_uri(req)
|
||||
<< ": client requested byte range " << left
|
||||
<< " (total file size " << file->size() << ")";
|
||||
break;
|
||||
}
|
||||
// FALLTHROUGH
|
||||
|
||||
case internal::RangeHeaderType::kAbsentOrInvalid:
|
||||
if (!file.AddRange(ByteRange(0, file.size()), &buf, &error_message)) {
|
||||
LOG(ERROR) << "Unable to serve file: " << error_message;
|
||||
return evhttp_send_error(req, HTTP_INTERNAL,
|
||||
EscapeHtml(error_message).c_str());
|
||||
}
|
||||
LOG(INFO) << "Replying to whole-file request";
|
||||
case internal::RangeHeaderType::kAbsentOrInvalid: {
|
||||
left = ByteRange(0, file->size());
|
||||
LOG(INFO) << req << ": URI " << evhttp_request_get_uri(req)
|
||||
<< ": Client requested whole file of size " << file->size();
|
||||
http_status = HTTP_OK;
|
||||
http_status_str = "OK";
|
||||
}
|
||||
}
|
||||
|
||||
// Successful reply ready; add common headers and send.
|
||||
evhttp_add_header(out_hdrs, "Content-Type", file.mime_type().c_str());
|
||||
// Successful reply started; add common headers and send.
|
||||
evhttp_add_header(out_hdrs, "Content-Length", StrCat(left.size()).c_str());
|
||||
evhttp_add_header(out_hdrs, "Content-Type", file->mime_type().c_str());
|
||||
evhttp_add_header(out_hdrs, "Accept-Ranges", "bytes");
|
||||
evhttp_add_header(out_hdrs, "Last-Modified", last_modified_str);
|
||||
evhttp_add_header(out_hdrs, "ETag", etag.c_str());
|
||||
evhttp_send_reply(req, http_status, http_status_str, buf.get());
|
||||
evhttp_send_reply_start(req, http_status, http_status_str);
|
||||
|
||||
ServeInProgress *serve = new ServeInProgress;
|
||||
serve->file = file;
|
||||
serve->left = left;
|
||||
serve->req = req;
|
||||
evhttp_connection *con = evhttp_request_get_connection(req);
|
||||
evhttp_connection_set_closecb(con, &ServeCloseCallback, serve);
|
||||
return ServeChunkCallback(con, serve);
|
||||
}
|
||||
|
||||
void HttpServeFile(evhttp_request *req, const std::string &mime_type,
|
||||
const std::string &filename, const struct stat &statbuf) {
|
||||
return HttpServe(RealFile(mime_type, filename, statbuf), req);
|
||||
return HttpServe(
|
||||
std::shared_ptr<VirtualFile>(new RealFile(mime_type, filename, statbuf)),
|
||||
req);
|
||||
}
|
||||
|
||||
} // namespace moonfire_nvr
|
||||
|
56
src/http.h
56
src/http.h
@ -41,6 +41,7 @@
|
||||
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include <event2/buffer.h>
|
||||
@ -118,9 +119,17 @@ void HttpSendError(evhttp_request *req, int http_err, const std::string &prefix,
|
||||
class FileSlice {
|
||||
public:
|
||||
virtual ~FileSlice() {}
|
||||
|
||||
virtual int64_t size() const = 0;
|
||||
virtual bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const = 0;
|
||||
|
||||
// Add some to all of the given non-empty |range| to |buf|.
|
||||
// Returns the number of bytes added, or < 0 on error.
|
||||
// On error, |error_message| should be populated. (|error_message| may also be
|
||||
// populated if 0 <= return value < range.size(), such as if one of a
|
||||
// FileSlices object's failed. However, it's safe to simply retry such
|
||||
// partial failures later.)
|
||||
virtual int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const = 0;
|
||||
};
|
||||
|
||||
class VirtualFile : public FileSlice {
|
||||
@ -139,8 +148,8 @@ class RealFileSlice : public FileSlice {
|
||||
|
||||
int64_t size() const final { return range_.size(); }
|
||||
|
||||
bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
|
||||
private:
|
||||
std::string filename_;
|
||||
@ -161,8 +170,8 @@ class FillerFileSlice : public FileSlice {
|
||||
|
||||
int64_t size() const final { return size_; }
|
||||
|
||||
bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
|
||||
private:
|
||||
FillFunction fn_;
|
||||
@ -175,8 +184,8 @@ class StaticStringPieceSlice : public FileSlice {
|
||||
explicit StaticStringPieceSlice(re2::StringPiece piece) : piece_(piece) {}
|
||||
|
||||
int64_t size() const final { return piece_.size(); }
|
||||
bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
|
||||
private:
|
||||
re2::StringPiece piece_;
|
||||
@ -188,8 +197,8 @@ class CopyingStringPieceSlice : public FileSlice {
|
||||
explicit CopyingStringPieceSlice(re2::StringPiece piece) : piece_(piece) {}
|
||||
|
||||
int64_t size() const final { return piece_.size(); }
|
||||
bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
|
||||
private:
|
||||
re2::StringPiece piece_;
|
||||
@ -204,22 +213,35 @@ class FileSlices : public FileSlice {
|
||||
|
||||
// |slice| must outlive the FileSlices.
|
||||
// |slice->size()| should not change after this call.
|
||||
void Append(const FileSlice *slice) {
|
||||
// |flags| should be a bitmask of Flags values below.
|
||||
void Append(const FileSlice *slice, int flags = 0) {
|
||||
int64_t new_size = size_ + slice->size();
|
||||
slices_.emplace_back(ByteRange(size_, new_size), slice);
|
||||
slices_.emplace_back(ByteRange(size_, new_size), slice, flags);
|
||||
size_ = new_size;
|
||||
}
|
||||
|
||||
int64_t size() const final { return size_; }
|
||||
bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final;
|
||||
|
||||
enum Flags {
|
||||
// kLazy, as an argument to Append, instructs the FileSlices to append
|
||||
// this slice in AddRange only if it is the first slice in the requested
|
||||
// range. Otherwise it returns early, expecting HttpServe to call AddRange
|
||||
// again after the earlier ranges have been sent. This is useful if it is
|
||||
// expensive to have the given slice pending. In particular, it is useful
|
||||
// when serving many file slices on 32-bit machines to avoid exhausting
|
||||
// the address space with too many memory mappings.
|
||||
kLazy = 1
|
||||
};
|
||||
|
||||
private:
|
||||
struct SliceInfo {
|
||||
SliceInfo(ByteRange range, const FileSlice *slice)
|
||||
: range(range), slice(slice) {}
|
||||
SliceInfo(ByteRange range, const FileSlice *slice, int flags)
|
||||
: range(range), slice(slice), flags(flags) {}
|
||||
ByteRange range;
|
||||
const FileSlice *slice = nullptr;
|
||||
int flags;
|
||||
};
|
||||
int64_t size_ = 0;
|
||||
|
||||
@ -237,7 +259,7 @@ class FileSlices : public FileSlice {
|
||||
// problematic, this interface may change to take advantage of
|
||||
// evbuffer_add_cb, adding buffers incrementally, and some mechanism will be
|
||||
// added to guarantee VirtualFile objects outlive the HTTP requests they serve.
|
||||
void HttpServe(const VirtualFile &file, evhttp_request *req);
|
||||
void HttpServe(const std::shared_ptr<VirtualFile> &file, evhttp_request *req);
|
||||
|
||||
// Serve a file over HTTP. Expects the caller to supply a sanitized |filename|
|
||||
// (rather than taking it straight from the path specified in |req|).
|
||||
|
@ -70,9 +70,12 @@ std::string ToHex(const FileSlice *slice, bool pad) {
|
||||
std::string Digest(const FileSlice *slice) {
|
||||
EvBuffer buf;
|
||||
std::string error_message;
|
||||
size_t size = slice->size();
|
||||
CHECK(slice->AddRange(ByteRange(0, size), &buf, &error_message))
|
||||
<< error_message;
|
||||
ByteRange left(0, slice->size());
|
||||
while (left.size() > 0) {
|
||||
auto ret = slice->AddRange(left, &buf, &error_message);
|
||||
CHECK_GT(ret, 0) << error_message;
|
||||
left.begin += ret;
|
||||
}
|
||||
evbuffer_iovec vec;
|
||||
auto digest = Digest::SHA1();
|
||||
while (evbuffer_peek(buf.get(), -1, nullptr, &vec, 1) > 0) {
|
||||
@ -250,7 +253,7 @@ class IntegrationTest : public testing::Test {
|
||||
return recording;
|
||||
}
|
||||
|
||||
std::unique_ptr<VirtualFile> CreateMp4FromSingleRecording(
|
||||
std::shared_ptr<VirtualFile> CreateMp4FromSingleRecording(
|
||||
const Recording &recording) {
|
||||
Mp4FileBuilder builder;
|
||||
builder.SetSampleEntry(video_sample_entry_);
|
||||
@ -265,8 +268,12 @@ class IntegrationTest : public testing::Test {
|
||||
void WriteMp4(VirtualFile *f) {
|
||||
EvBuffer buf;
|
||||
std::string error_message;
|
||||
EXPECT_TRUE(f->AddRange(ByteRange(0, f->size()), &buf, &error_message))
|
||||
<< error_message;
|
||||
ByteRange left(0, f->size());
|
||||
while (left.size() > 0) {
|
||||
auto ret = f->AddRange(left, &buf, &error_message);
|
||||
ASSERT_GT(ret, 0) << error_message;
|
||||
left.begin += ret;
|
||||
}
|
||||
WriteFileOrDie(StrCat(tmpdir_path_, "/clip.new.mp4"), &buf);
|
||||
}
|
||||
|
||||
|
16
src/mp4.cc
16
src/mp4.cc
@ -392,7 +392,7 @@ class Mp4File : public VirtualFile {
|
||||
for (const auto &segment : segments_) {
|
||||
segment->sample_file_slice.Init(segment->recording.sample_file_path,
|
||||
segment->pieces.sample_pos());
|
||||
slices_.Append(&segment->sample_file_slice);
|
||||
slices_.Append(&segment->sample_file_slice, FileSlices::kLazy);
|
||||
}
|
||||
mdat_.header().largesize = ToNetworkU64(slices_.size() - size_before_mdat);
|
||||
|
||||
@ -414,8 +414,8 @@ class Mp4File : public VirtualFile {
|
||||
std::string etag() const final { return etag_; }
|
||||
std::string mime_type() const final { return "video/mp4"; }
|
||||
int64_t size() const final { return slices_.size(); }
|
||||
bool AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final {
|
||||
int64_t AddRange(ByteRange range, EvBuffer *buf,
|
||||
std::string *error_message) const final {
|
||||
return slices_.AddRange(range, buf, error_message);
|
||||
}
|
||||
|
||||
@ -724,7 +724,7 @@ Mp4FileBuilder &Mp4FileBuilder::SetSampleEntry(const VideoSampleEntry &entry) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
std::unique_ptr<VirtualFile> Mp4FileBuilder::Build(std::string *error_message) {
|
||||
std::shared_ptr<VirtualFile> Mp4FileBuilder::Build(std::string *error_message) {
|
||||
int32_t sample_offset = 1;
|
||||
for (auto &segment : segments_) {
|
||||
if (segment->recording.video_sample_entry_sha1 !=
|
||||
@ -733,24 +733,24 @@ std::unique_ptr<VirtualFile> Mp4FileBuilder::Build(std::string *error_message) {
|
||||
StrCat("inconsistent video sample entries. builder has: ",
|
||||
ToHex(video_sample_entry_.sha1), ", segment has: ",
|
||||
ToHex(segment->recording.video_sample_entry_sha1));
|
||||
return std::unique_ptr<VirtualFile>();
|
||||
return std::shared_ptr<VirtualFile>();
|
||||
}
|
||||
|
||||
if (!segment->pieces.Init(&segment->recording,
|
||||
1, // sample entry index
|
||||
sample_offset, segment->rel_start_90k,
|
||||
segment->rel_end_90k, error_message)) {
|
||||
return std::unique_ptr<VirtualFile>();
|
||||
return std::shared_ptr<VirtualFile>();
|
||||
}
|
||||
sample_offset += segment->pieces.samples();
|
||||
}
|
||||
|
||||
if (segments_.empty()) {
|
||||
*error_message = "Can't construct empty .mp4";
|
||||
return std::unique_ptr<VirtualFile>();
|
||||
return std::shared_ptr<VirtualFile>();
|
||||
}
|
||||
|
||||
return std::unique_ptr<VirtualFile>(
|
||||
return std::shared_ptr<VirtualFile>(
|
||||
new Mp4File(std::move(segments_), std::move(video_sample_entry_)));
|
||||
}
|
||||
|
||||
|
@ -156,7 +156,7 @@ class Mp4FileBuilder {
|
||||
// * Non-final segment has zero duration of last sample.
|
||||
// * Data error in one of the recording sample indexes.
|
||||
// * Invalid start/end.
|
||||
std::unique_ptr<VirtualFile> Build(std::string *error_message);
|
||||
std::shared_ptr<VirtualFile> Build(std::string *error_message);
|
||||
|
||||
private:
|
||||
std::vector<std::unique_ptr<internal::Mp4FileSegment>> segments_;
|
||||
|
@ -122,10 +122,11 @@ void WriteFileOrDie(const std::string &path, re2::StringPiece contents) {
|
||||
void WriteFileOrDie(const std::string &path, EvBuffer *buf) {
|
||||
int fd = open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0600);
|
||||
PCHECK(fd >= 0) << "open: " << path;
|
||||
size_t buf_len = evbuffer_get_length(buf->get());
|
||||
int written = evbuffer_write(buf->get(), fd);
|
||||
PCHECK(written >= 0 && buf_len == static_cast<size_t>(written))
|
||||
<< "buf_len: " << buf_len << ", written: " << written;
|
||||
while (evbuffer_get_length(buf->get()) > 0) {
|
||||
size_t buf_len = evbuffer_get_length(buf->get());
|
||||
int written = evbuffer_write(buf->get(), fd);
|
||||
PCHECK(written >= 0) << "buf_len: " << buf_len << ", written: " << written;
|
||||
}
|
||||
PCHECK(close(fd) == 0) << "close";
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user