diff --git a/CMakeLists.txt b/CMakeLists.txt index 7387669..9b9f74d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,7 +59,7 @@ find_library(PROFILER_LIBRARIES profiler) # https://cmake.org/cmake/help/v3.0/module/FindPkgConfig.html find_package(PkgConfig) pkg_check_modules(FFMPEG REQUIRED libavutil libavcodec libavformat) -pkg_check_modules(LIBEVENT REQUIRED libevent) +pkg_check_modules(LIBEVENT REQUIRED libevent>=2.1) pkg_check_modules(GLOG REQUIRED libglog) pkg_check_modules(OPENSSL REQUIRED libcrypto) pkg_check_modules(SQLITE REQUIRED sqlite3) diff --git a/README.md b/README.md index e382612..9a93502 100644 --- a/README.md +++ b/README.md @@ -64,9 +64,11 @@ from source. It requires several packages to build: along with all versions of the competing project [libav](http://libav.org), does not support socket timeouts for RTSP. For reliable reconnections on error, it's strongly recommended to use ffmpeg >= 55.1.101. -* [libevent](http://libevent.org/) 2.x, for the built-in HTTP server. +* [libevent](http://libevent.org/) 2.1, for the built-in HTTP server. (This might be replaced with the more full-featured [nghttp2](https://github.com/tatsuhiro-t/nghttp2) in the future.) + Unfortunately, the libevent 2.0 bundled with current Debian releases is + unsuitable. * [protocol buffers](https://developers.google.com/protocol-buffers/), currently just for the configuration file. * [gflags](http://gflags.github.io/gflags/), for commandline flag parsing. @@ -79,7 +81,7 @@ from source. It requires several packages to build: * libuuid from (util-linux)[https://en.wikipedia.org/wiki/Util-linux]. * [SQLite3](https://www.sqlite.org/). -On Ubuntu 15.10 or Raspbian Jessie, the following command will install all +On Ubuntu 15.10 or Raspbian Jessie, the following command will install most pre-requisites (see also the `Build-Depends` field in `debian/control`): $ sudo apt-get install \ @@ -89,15 +91,18 @@ pre-requisites (see also the `Build-Depends` field in `debian/control`): libavcodec-dev \ libavformat-dev \ libavutil-dev \ - libevent-dev \ libgflags-dev \ libgoogle-glog-dev \ libgoogle-perftools-dev \ libre2-dev \ libsqlite3-dev \ - libuuid-dev \ pkgconf \ - protobuf-compiler + protobuf-compiler \ + uuid-dev + +libevent 2.1 will have to be installed from source. In the future, this +dependency may be replaced or support may be added for automatically building +libevent in-tree to avoid the inconvenience. Once prerequisites are installed, Moonfire NVR can be built as follows: @@ -107,7 +112,8 @@ Once prerequisites are installed, Moonfire NVR can be built as follows: $ make $ sudo make install -Alternatively, you can prepare a `.deb` package: +Alternatively, if you do have a sufficiently new apt-installed libevent +installed, you may be able to prepare a `.deb` package: $ sudo apt-get install devscripts dh-systemd $ debuild -us -uc diff --git a/debian/control b/debian/control index 06b60e8..174c6ae 100644 --- a/debian/control +++ b/debian/control @@ -3,7 +3,7 @@ Maintainer: Scott Lamb Section: video Priority: optional Standards-Version: 3.9.6.1 -Build-Depends: debhelper (>= 9), dh-systemd, cmake, libprotobuf-dev, libavcodec-dev, libavformat-dev, libevent-dev, libgflags-dev, libgoogle-glog-dev, libgoogle-perftools-dev, libre2-dev, pkgconf, protobuf-compiler, libuuid-dev, libsqlite3-dev +Build-Depends: debhelper (>= 9), dh-systemd, cmake, libprotobuf-dev, libavcodec-dev, libavformat-dev, libevent-dev (>= 2.1), libgflags-dev, libgoogle-glog-dev, libgoogle-perftools-dev, libre2-dev, pkgconf, protobuf-compiler, uuid-dev, libsqlite3-dev Package: moonfire-nvr Architecture: any diff --git a/src/http-test.cc b/src/http-test.cc index 1eb8987..41bf0d3 100644 --- a/src/http-test.cc +++ b/src/http-test.cc @@ -61,7 +61,7 @@ namespace { class MockFileSlice : public FileSlice { public: MOCK_CONST_METHOD0(size, int64_t()); - MOCK_CONST_METHOD3(AddRange, bool(ByteRange, EvBuffer *, std::string *)); + MOCK_CONST_METHOD3(AddRange, int64_t(ByteRange, EvBuffer *, std::string *)); }; TEST(EvBufferTest, AddFileTest) { @@ -99,18 +99,18 @@ TEST(EvBufferTest, AddFileTest) { class FileSlicesTest : public testing::Test { protected: - FileSlicesTest() { + void Init(int flags) { EXPECT_CALL(a_, size()).Times(AnyNumber()).WillRepeatedly(Return(5)); EXPECT_CALL(b_, size()).Times(AnyNumber()).WillRepeatedly(Return(13)); EXPECT_CALL(c_, size()).Times(AnyNumber()).WillRepeatedly(Return(7)); EXPECT_CALL(d_, size()).Times(AnyNumber()).WillRepeatedly(Return(17)); EXPECT_CALL(e_, size()).Times(AnyNumber()).WillRepeatedly(Return(19)); - slices_.Append(&a_); - slices_.Append(&b_); - slices_.Append(&c_); - slices_.Append(&d_); - slices_.Append(&e_); + slices_.Append(&a_, flags); + slices_.Append(&b_, flags); + slices_.Append(&c_, flags); + slices_.Append(&d_, flags); + slices_.Append(&e_, flags); } FileSlices slices_; @@ -122,44 +122,79 @@ class FileSlicesTest : public testing::Test { }; TEST_F(FileSlicesTest, Size) { + Init(0); EXPECT_EQ(5 + 13 + 7 + 17 + 19, slices_.size()); } TEST_F(FileSlicesTest, ExactSlice) { // Exactly slice b. + Init(0); std::string error_message; - EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(true)); - EXPECT_TRUE(slices_.AddRange(ByteRange(5, 18), nullptr, &error_message)) + EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(13)); + EXPECT_EQ(13, slices_.AddRange(ByteRange(5, 18), nullptr, &error_message)) << error_message; } TEST_F(FileSlicesTest, Offset) { // Part of slice b, all of slice c, and part of slice d. + Init(0); std::string error_message; - EXPECT_CALL(b_, AddRange(ByteRange(12, 13), _, _)).WillOnce(Return(true)); - EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(true)); - EXPECT_CALL(d_, AddRange(ByteRange(0, 1), _, _)).WillOnce(Return(true)); - EXPECT_TRUE(slices_.AddRange(ByteRange(17, 26), nullptr, &error_message)) + EXPECT_CALL(b_, AddRange(ByteRange(12, 13), _, _)).WillOnce(Return(1)); + EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(7)); + EXPECT_CALL(d_, AddRange(ByteRange(0, 1), _, _)).WillOnce(Return(1)); + EXPECT_EQ(9, slices_.AddRange(ByteRange(17, 26), nullptr, &error_message)) << error_message; } TEST_F(FileSlicesTest, Everything) { + Init(0); std::string error_message; - EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(true)); - EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(true)); - EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(true)); - EXPECT_CALL(d_, AddRange(ByteRange(0, 17), _, _)).WillOnce(Return(true)); - EXPECT_CALL(e_, AddRange(ByteRange(0, 19), _, _)).WillOnce(Return(true)); - EXPECT_TRUE(slices_.AddRange(ByteRange(0, 61), nullptr, &error_message)) + EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5)); + EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(13)); + EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(7)); + EXPECT_CALL(d_, AddRange(ByteRange(0, 17), _, _)).WillOnce(Return(17)); + EXPECT_CALL(e_, AddRange(ByteRange(0, 19), _, _)).WillOnce(Return(19)); + EXPECT_EQ(61, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message)) + << error_message; +} + +TEST_F(FileSlicesTest, Lazy) { + Init(FileSlices::kLazy); + std::string error_message; + EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5)); + EXPECT_EQ(5, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message)) + << error_message; + EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(13)); + EXPECT_EQ(13, slices_.AddRange(ByteRange(5, 61), nullptr, &error_message)) + << error_message; + EXPECT_CALL(c_, AddRange(ByteRange(0, 7), _, _)).WillOnce(Return(7)); + EXPECT_EQ(7, slices_.AddRange(ByteRange(18, 61), nullptr, &error_message)) + << error_message; + EXPECT_CALL(d_, AddRange(ByteRange(0, 17), _, _)).WillOnce(Return(17)); + EXPECT_EQ(17, slices_.AddRange(ByteRange(25, 61), nullptr, &error_message)) + << error_message; + EXPECT_CALL(e_, AddRange(ByteRange(0, 19), _, _)).WillOnce(Return(19)); + EXPECT_EQ(19, slices_.AddRange(ByteRange(42, 61), nullptr, &error_message)) + << error_message; +} + +TEST_F(FileSlicesTest, SliceWithPartialReturn) { + Init(0); + std::string error_message; + EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5)); + EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)).WillOnce(Return(1)); + EXPECT_EQ(6, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message)) << error_message; } TEST_F(FileSlicesTest, PropagateError) { + Init(0); std::string error_message; - EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(true)); + EXPECT_CALL(a_, AddRange(ByteRange(0, 5), _, _)).WillOnce(Return(5)); EXPECT_CALL(b_, AddRange(ByteRange(0, 13), _, _)) - .WillOnce(DoAll(SetArgPointee<2>("asdf"), Return(false))); - EXPECT_FALSE(slices_.AddRange(ByteRange(0, 61), nullptr, &error_message)); + .WillRepeatedly(DoAll(SetArgPointee<2>("asdf"), Return(-1))); + EXPECT_EQ(5, slices_.AddRange(ByteRange(0, 61), nullptr, &error_message)); + EXPECT_EQ(-1, slices_.AddRange(ByteRange(5, 61), nullptr, &error_message)); EXPECT_EQ("asdf", error_message); } @@ -241,6 +276,13 @@ TEST(RangeHeaderTest, AbsentOrInvalid) { ParseRangeHeader("bytes=-", 10000, &ranges)); } +// TODO: test HttpServe itself! +// Currently the testing is manual. Three important cases: +// * HTTP request succeeds +// * client aborts (as in hitting ctrl-C in curl during a long request) +// * the VirtualFile returns error (say, by chmod u-r on the file backing +// a RealFileSlice) + } // namespace } // namespace moonfire_nvr diff --git a/src/http.cc b/src/http.cc index 1a6b553..1364805 100644 --- a/src/http.cc +++ b/src/http.cc @@ -40,7 +40,7 @@ #include #include -#include +#include #include #include @@ -74,8 +74,8 @@ class RealFile : public VirtualFile { } // Add the given range of the file to the buffer. - bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final { + int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const final { return slice_.AddRange(range, buf, error_message); } @@ -85,6 +85,62 @@ class RealFile : public VirtualFile { const struct stat stat_; }; +// An HttpServe call still in progress. +struct ServeInProgress { + ByteRange left; + int64_t sent_bytes = 0; + std::shared_ptr file; + evhttp_request *req = nullptr; +}; + +void ServeCloseCallback(evhttp_connection *con, void *arg) { + std::unique_ptr serve( + reinterpret_cast(arg)); + LOG(INFO) << serve->req << ": received client abort after sending " + << serve->sent_bytes << " bytes; there were " << serve->left.size() + << " bytes left."; + + // The call to cancel will guarantee ServeChunkCallback is not called again. + evhttp_cancel_request(serve->req); +} + +void ServeChunkCallback(evhttp_connection *con, void *arg) { + std::unique_ptr serve( + reinterpret_cast(arg)); + + if (serve->left.size() == 0) { + LOG(INFO) << serve->req << ": done; sent " << serve->sent_bytes + << " bytes."; + evhttp_connection_set_closecb(con, nullptr, nullptr); + evhttp_send_reply_end(serve->req); + return; + } + + // Serve more data. + EvBuffer buf; + std::string error_message; + int64_t added = serve->file->AddRange(serve->left, &buf, &error_message); + if (added < 0) { + // Order is important here: evhttp_cancel_request immediately calls the + // close callback, so remove it first to avoid double-freeing |serve|. + evhttp_connection_set_closecb(con, nullptr, nullptr); + evhttp_cancel_request(serve->req); + LOG(ERROR) << serve->req << ": Failed to serve request after sending " + << serve->sent_bytes << " bytes (" << serve->left.size() + << " bytes left): " << error_message; + return; + } + + serve->sent_bytes += added; + serve->left.begin += added; + VLOG(1) << serve->req << ": sending " << added << " bytes (more) data; still " + << serve->left.size() << " bytes left"; + evhttp_send_reply_chunk_with_cb(serve->req, buf.get(), &ServeChunkCallback, + serve.get()); + evhttp_send_reply_chunk(serve->req, buf.get()); + serve.release(); +} + } // namespace namespace internal { @@ -196,76 +252,97 @@ void RealFileSlice::Init(re2::StringPiece filename, ByteRange range) { range_ = range; } -bool RealFileSlice::AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const { +int64_t RealFileSlice::AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const { int fd = open(filename_.c_str(), O_RDONLY); if (fd < 0) { int err = errno; - *error_message = StrCat("open: ", strerror(err)); - return false; + *error_message = StrCat("open ", filename_, ": ", strerror(err)); + return -1; } if (!buf->AddFile(fd, range_.begin + range.begin, range.size(), error_message)) { close(fd); - return false; + return -1; } // |buf| now owns |fd|. - return true; + return range.size(); } -bool FillerFileSlice::AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const { +int64_t FillerFileSlice::AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const { std::unique_ptr s(new std::string); s->reserve(size_); if (!fn_(s.get(), error_message)) { - return false; + return 0; } if (s->size() != size_) { *error_message = StrCat("Expected filled slice to be ", size_, " bytes; got ", s->size(), " bytes."); - return false; + return 0; } std::string *unowned_s = s.release(); buf->AddReference(unowned_s->data() + range.begin, range.size(), [](const void *, size_t, void *s) { delete reinterpret_cast(s); }, unowned_s); - return true; + return range.size(); } -bool StaticStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const { +int64_t StaticStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const { buf->AddReference(piece_.data() + range.begin, range.size(), nullptr, nullptr); - return true; + return range.size(); } -bool CopyingStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const { +int64_t CopyingStringPieceSlice::AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const { buf->Add(re2::StringPiece(piece_.data() + range.begin, range.size())); - return true; + return range.size(); } -bool FileSlices::AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const { +int64_t FileSlices::AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const { if (range.begin < 0 || range.begin > range.end || range.end > size_) { *error_message = StrCat("Range ", range.DebugString(), " not valid for file of size ", size_); return false; } + int64_t total_bytes_added = 0; auto it = std::upper_bound(slices_.begin(), slices_.end(), range.begin, [](int64_t begin, const SliceInfo &info) { return begin < info.range.end; }); for (; it != slices_.end() && range.end > it->range.begin; ++it) { + if (total_bytes_added > 0 && (it->flags & kLazy) != 0) { + VLOG(1) << "early return of " << total_bytes_added << "/" << range.size() + << " bytes from FileSlices " << this << " because slice " + << it->slice << " is lazy."; + break; + } ByteRange mapped( std::max(INT64_C(0), range.begin - it->range.begin), std::min(range.end - it->range.begin, it->range.end - it->range.begin)); - if (!it->slice->AddRange(mapped, buf, error_message)) { - return false; + int64_t slice_bytes_added = it->slice->AddRange(mapped, buf, error_message); + total_bytes_added += slice_bytes_added > 0 ? slice_bytes_added : 0; + if (slice_bytes_added < 0 && total_bytes_added == 0) { + LOG(WARNING) << "early return of " << total_bytes_added << "/" + << range.size() << " bytes from FileSlices " << this + << " due to slice " << it->slice + << " returning error: " << *error_message; + return -1; + } else if (slice_bytes_added < mapped.size()) { + LOG(INFO) << "early return of " << total_bytes_added << "/" + << range.size() << " bytes from FileSlices " << this + << " due to slice " << it->slice << " returning " + << slice_bytes_added << "/" << mapped.size() + << " bytes. error_message (maybe populated): " + << *error_message; + break; } } - return true; + return total_bytes_added; } void HttpSendError(evhttp_request *req, int http_err, const std::string &prefix, @@ -274,7 +351,7 @@ void HttpSendError(evhttp_request *req, int http_err, const std::string &prefix, EscapeHtml(prefix + strerror(posix_err)).c_str()); } -void HttpServe(const VirtualFile &file, evhttp_request *req) { +void HttpServe(const std::shared_ptr &file, evhttp_request *req) { // We could support HEAD, but there's probably no need. if (evhttp_request_get_command(req) != EVHTTP_REQ_GET) { return evhttp_send_error(req, HTTP_BADMETHOD, "only GET allowed"); @@ -284,7 +361,7 @@ void HttpServe(const VirtualFile &file, evhttp_request *req) { struct evkeyvalq *out_hdrs = evhttp_request_get_output_headers(req); // Construct a Last-Modified: header. - time_t last_modified = file.last_modified(); + time_t last_modified = file->last_modified(); struct tm last_modified_tm; if (gmtime_r(&last_modified, &last_modified_tm) == 0) { return HttpSendError(req, HTTP_INTERNAL, "gmtime_r failed: ", errno); @@ -294,73 +371,83 @@ void HttpServe(const VirtualFile &file, evhttp_request *req) { "%a, %d %b %Y %H:%M:%S GMT", &last_modified_tm) == 0) { return HttpSendError(req, HTTP_INTERNAL, "strftime failed: ", errno); } - std::string etag = file.etag(); + std::string etag = file->etag(); // Ignore the "Range:" header if "If-Range:" specifies an incorrect etag. const char *if_range = evhttp_find_header(in_hdrs, "If-Range"); const char *range_hdr = evhttp_find_header(in_hdrs, "Range"); if (if_range != nullptr && etag != if_range) { - LOG(INFO) << "Ignoring Range: because If-Range: is stale."; + LOG(INFO) << req << ": Ignoring Range: because If-Range: is stale."; range_hdr = nullptr; } EvBuffer buf; std::vector ranges; - auto range_type = internal::ParseRangeHeader(range_hdr, file.size(), &ranges); + auto range_type = + internal::ParseRangeHeader(range_hdr, file->size(), &ranges); std::string error_message; int http_status; const char *http_status_str; + ByteRange left; switch (range_type) { case internal::RangeHeaderType::kNotSatisfiable: { - std::string range_hdr = StrCat("bytes */", file.size()); + std::string range_hdr = StrCat("bytes */", file->size()); evhttp_add_header(out_hdrs, "Content-Range", range_hdr.c_str()); http_status = 416; http_status_str = "Range Not Satisfiable"; - LOG(INFO) << "Replying to non-satisfiable range request: " << range_hdr; + LOG(INFO) << req + << ": Replying to non-satisfiable range request: " << range_hdr; break; } case internal::RangeHeaderType::kSatisfiable: // We only support the simpler single-range case for now. + // A multi-range request just serves the whole file via the fallthrough. if (ranges.size() == 1) { std::string range_hdr = StrCat("bytes ", ranges[0].begin, "-", - ranges[0].end - 1, "/", file.size()); - if (!file.AddRange(ranges[0], &buf, &error_message)) { - LOG(ERROR) << "Unable to serve range " << ranges[0] << ": " - << error_message; - return evhttp_send_error(req, HTTP_INTERNAL, - EscapeHtml(error_message).c_str()); - } + ranges[0].end - 1, "/", file->size()); + left = ranges[0]; evhttp_add_header(out_hdrs, "Content-Range", range_hdr.c_str()); http_status = 206; http_status_str = "Partial Content"; - LOG(INFO) << "Replying to range request"; + LOG(INFO) << req << ": URI " << evhttp_request_get_uri(req) + << ": client requested byte range " << left + << " (total file size " << file->size() << ")"; break; } // FALLTHROUGH - case internal::RangeHeaderType::kAbsentOrInvalid: - if (!file.AddRange(ByteRange(0, file.size()), &buf, &error_message)) { - LOG(ERROR) << "Unable to serve file: " << error_message; - return evhttp_send_error(req, HTTP_INTERNAL, - EscapeHtml(error_message).c_str()); - } - LOG(INFO) << "Replying to whole-file request"; + case internal::RangeHeaderType::kAbsentOrInvalid: { + left = ByteRange(0, file->size()); + LOG(INFO) << req << ": URI " << evhttp_request_get_uri(req) + << ": Client requested whole file of size " << file->size(); http_status = HTTP_OK; http_status_str = "OK"; + } } - // Successful reply ready; add common headers and send. - evhttp_add_header(out_hdrs, "Content-Type", file.mime_type().c_str()); + // Successful reply started; add common headers and send. + evhttp_add_header(out_hdrs, "Content-Length", StrCat(left.size()).c_str()); + evhttp_add_header(out_hdrs, "Content-Type", file->mime_type().c_str()); evhttp_add_header(out_hdrs, "Accept-Ranges", "bytes"); evhttp_add_header(out_hdrs, "Last-Modified", last_modified_str); evhttp_add_header(out_hdrs, "ETag", etag.c_str()); - evhttp_send_reply(req, http_status, http_status_str, buf.get()); + evhttp_send_reply_start(req, http_status, http_status_str); + + ServeInProgress *serve = new ServeInProgress; + serve->file = file; + serve->left = left; + serve->req = req; + evhttp_connection *con = evhttp_request_get_connection(req); + evhttp_connection_set_closecb(con, &ServeCloseCallback, serve); + return ServeChunkCallback(con, serve); } void HttpServeFile(evhttp_request *req, const std::string &mime_type, const std::string &filename, const struct stat &statbuf) { - return HttpServe(RealFile(mime_type, filename, statbuf), req); + return HttpServe( + std::shared_ptr(new RealFile(mime_type, filename, statbuf)), + req); } } // namespace moonfire_nvr diff --git a/src/http.h b/src/http.h index 9736a77..db6e97d 100644 --- a/src/http.h +++ b/src/http.h @@ -41,6 +41,7 @@ #include #include +#include #include #include @@ -118,9 +119,17 @@ void HttpSendError(evhttp_request *req, int http_err, const std::string &prefix, class FileSlice { public: virtual ~FileSlice() {} + virtual int64_t size() const = 0; - virtual bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const = 0; + + // Add some to all of the given non-empty |range| to |buf|. + // Returns the number of bytes added, or < 0 on error. + // On error, |error_message| should be populated. (|error_message| may also be + // populated if 0 <= return value < range.size(), such as if one of a + // FileSlices object's failed. However, it's safe to simply retry such + // partial failures later.) + virtual int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const = 0; }; class VirtualFile : public FileSlice { @@ -139,8 +148,8 @@ class RealFileSlice : public FileSlice { int64_t size() const final { return range_.size(); } - bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final; + int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const final; private: std::string filename_; @@ -161,8 +170,8 @@ class FillerFileSlice : public FileSlice { int64_t size() const final { return size_; } - bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final; + int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const final; private: FillFunction fn_; @@ -175,8 +184,8 @@ class StaticStringPieceSlice : public FileSlice { explicit StaticStringPieceSlice(re2::StringPiece piece) : piece_(piece) {} int64_t size() const final { return piece_.size(); } - bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final; + int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const final; private: re2::StringPiece piece_; @@ -188,8 +197,8 @@ class CopyingStringPieceSlice : public FileSlice { explicit CopyingStringPieceSlice(re2::StringPiece piece) : piece_(piece) {} int64_t size() const final { return piece_.size(); } - bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final; + int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const final; private: re2::StringPiece piece_; @@ -204,22 +213,35 @@ class FileSlices : public FileSlice { // |slice| must outlive the FileSlices. // |slice->size()| should not change after this call. - void Append(const FileSlice *slice) { + // |flags| should be a bitmask of Flags values below. + void Append(const FileSlice *slice, int flags = 0) { int64_t new_size = size_ + slice->size(); - slices_.emplace_back(ByteRange(size_, new_size), slice); + slices_.emplace_back(ByteRange(size_, new_size), slice, flags); size_ = new_size; } int64_t size() const final { return size_; } - bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final; + int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const final; + + enum Flags { + // kLazy, as an argument to Append, instructs the FileSlices to append + // this slice in AddRange only if it is the first slice in the requested + // range. Otherwise it returns early, expecting HttpServe to call AddRange + // again after the earlier ranges have been sent. This is useful if it is + // expensive to have the given slice pending. In particular, it is useful + // when serving many file slices on 32-bit machines to avoid exhausting + // the address space with too many memory mappings. + kLazy = 1 + }; private: struct SliceInfo { - SliceInfo(ByteRange range, const FileSlice *slice) - : range(range), slice(slice) {} + SliceInfo(ByteRange range, const FileSlice *slice, int flags) + : range(range), slice(slice), flags(flags) {} ByteRange range; const FileSlice *slice = nullptr; + int flags; }; int64_t size_ = 0; @@ -237,7 +259,7 @@ class FileSlices : public FileSlice { // problematic, this interface may change to take advantage of // evbuffer_add_cb, adding buffers incrementally, and some mechanism will be // added to guarantee VirtualFile objects outlive the HTTP requests they serve. -void HttpServe(const VirtualFile &file, evhttp_request *req); +void HttpServe(const std::shared_ptr &file, evhttp_request *req); // Serve a file over HTTP. Expects the caller to supply a sanitized |filename| // (rather than taking it straight from the path specified in |req|). diff --git a/src/mp4-test.cc b/src/mp4-test.cc index 3678427..27a9aa8 100644 --- a/src/mp4-test.cc +++ b/src/mp4-test.cc @@ -70,9 +70,12 @@ std::string ToHex(const FileSlice *slice, bool pad) { std::string Digest(const FileSlice *slice) { EvBuffer buf; std::string error_message; - size_t size = slice->size(); - CHECK(slice->AddRange(ByteRange(0, size), &buf, &error_message)) - << error_message; + ByteRange left(0, slice->size()); + while (left.size() > 0) { + auto ret = slice->AddRange(left, &buf, &error_message); + CHECK_GT(ret, 0) << error_message; + left.begin += ret; + } evbuffer_iovec vec; auto digest = Digest::SHA1(); while (evbuffer_peek(buf.get(), -1, nullptr, &vec, 1) > 0) { @@ -250,7 +253,7 @@ class IntegrationTest : public testing::Test { return recording; } - std::unique_ptr CreateMp4FromSingleRecording( + std::shared_ptr CreateMp4FromSingleRecording( const Recording &recording) { Mp4FileBuilder builder; builder.SetSampleEntry(video_sample_entry_); @@ -265,8 +268,12 @@ class IntegrationTest : public testing::Test { void WriteMp4(VirtualFile *f) { EvBuffer buf; std::string error_message; - EXPECT_TRUE(f->AddRange(ByteRange(0, f->size()), &buf, &error_message)) - << error_message; + ByteRange left(0, f->size()); + while (left.size() > 0) { + auto ret = f->AddRange(left, &buf, &error_message); + ASSERT_GT(ret, 0) << error_message; + left.begin += ret; + } WriteFileOrDie(StrCat(tmpdir_path_, "/clip.new.mp4"), &buf); } diff --git a/src/mp4.cc b/src/mp4.cc index 8d6f05e..34261b7 100644 --- a/src/mp4.cc +++ b/src/mp4.cc @@ -392,7 +392,7 @@ class Mp4File : public VirtualFile { for (const auto &segment : segments_) { segment->sample_file_slice.Init(segment->recording.sample_file_path, segment->pieces.sample_pos()); - slices_.Append(&segment->sample_file_slice); + slices_.Append(&segment->sample_file_slice, FileSlices::kLazy); } mdat_.header().largesize = ToNetworkU64(slices_.size() - size_before_mdat); @@ -414,8 +414,8 @@ class Mp4File : public VirtualFile { std::string etag() const final { return etag_; } std::string mime_type() const final { return "video/mp4"; } int64_t size() const final { return slices_.size(); } - bool AddRange(ByteRange range, EvBuffer *buf, - std::string *error_message) const final { + int64_t AddRange(ByteRange range, EvBuffer *buf, + std::string *error_message) const final { return slices_.AddRange(range, buf, error_message); } @@ -724,7 +724,7 @@ Mp4FileBuilder &Mp4FileBuilder::SetSampleEntry(const VideoSampleEntry &entry) { return *this; } -std::unique_ptr Mp4FileBuilder::Build(std::string *error_message) { +std::shared_ptr Mp4FileBuilder::Build(std::string *error_message) { int32_t sample_offset = 1; for (auto &segment : segments_) { if (segment->recording.video_sample_entry_sha1 != @@ -733,24 +733,24 @@ std::unique_ptr Mp4FileBuilder::Build(std::string *error_message) { StrCat("inconsistent video sample entries. builder has: ", ToHex(video_sample_entry_.sha1), ", segment has: ", ToHex(segment->recording.video_sample_entry_sha1)); - return std::unique_ptr(); + return std::shared_ptr(); } if (!segment->pieces.Init(&segment->recording, 1, // sample entry index sample_offset, segment->rel_start_90k, segment->rel_end_90k, error_message)) { - return std::unique_ptr(); + return std::shared_ptr(); } sample_offset += segment->pieces.samples(); } if (segments_.empty()) { *error_message = "Can't construct empty .mp4"; - return std::unique_ptr(); + return std::shared_ptr(); } - return std::unique_ptr( + return std::shared_ptr( new Mp4File(std::move(segments_), std::move(video_sample_entry_))); } diff --git a/src/mp4.h b/src/mp4.h index 63d3464..ddbb253 100644 --- a/src/mp4.h +++ b/src/mp4.h @@ -156,7 +156,7 @@ class Mp4FileBuilder { // * Non-final segment has zero duration of last sample. // * Data error in one of the recording sample indexes. // * Invalid start/end. - std::unique_ptr Build(std::string *error_message); + std::shared_ptr Build(std::string *error_message); private: std::vector> segments_; diff --git a/src/testutil.cc b/src/testutil.cc index ce28e79..f4e7e45 100644 --- a/src/testutil.cc +++ b/src/testutil.cc @@ -122,10 +122,11 @@ void WriteFileOrDie(const std::string &path, re2::StringPiece contents) { void WriteFileOrDie(const std::string &path, EvBuffer *buf) { int fd = open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0600); PCHECK(fd >= 0) << "open: " << path; - size_t buf_len = evbuffer_get_length(buf->get()); - int written = evbuffer_write(buf->get(), fd); - PCHECK(written >= 0 && buf_len == static_cast(written)) - << "buf_len: " << buf_len << ", written: " << written; + while (evbuffer_get_length(buf->get()) > 0) { + size_t buf_len = evbuffer_get_length(buf->get()); + int written = evbuffer_write(buf->get(), fd); + PCHECK(written >= 0) << "buf_len: " << buf_len << ", written: " << written; + } PCHECK(close(fd) == 0) << "close"; }