From 48d0473a4ceacfa9fecd0137a4863a822e6aed76 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Sat, 9 Jan 2016 17:16:55 -0800 Subject: [PATCH] Small helper for writing sample files safely. Handles partial writes + checksumming. --- src/filesystem.cc | 14 +++++ src/filesystem.h | 49 +++++++++++++++-- src/recording-test.cc | 125 ++++++++++++++++++++++++++++++++++++++++++ src/recording.cc | 82 +++++++++++++++++++++++++++ src/recording.h | 42 ++++++++++++++ 5 files changed, 306 insertions(+), 6 deletions(-) diff --git a/src/filesystem.cc b/src/filesystem.cc index 089184f..6ba0d1a 100644 --- a/src/filesystem.cc +++ b/src/filesystem.cc @@ -79,6 +79,20 @@ class RealFile : public File { return 0; } + int Open(const char *path, int flags, std::unique_ptr *f) final { + return Open(path, flags, 0, f); + } + + int Open(const char *path, int flags, mode_t mode, + std::unique_ptr *f) final { + int ret = openat(fd_, path, flags, mode); + if (ret < 0) { + return errno; + } + f->reset(new RealFile(ret)); + return 0; + } + int Read(void *buf, size_t size, size_t *bytes_read) final { ssize_t ret; while ((ret = read(fd_, buf, size)) == -1 && errno == EINTR) diff --git a/src/filesystem.h b/src/filesystem.h index 2455f88..9810386 100644 --- a/src/filesystem.h +++ b/src/filesystem.h @@ -45,13 +45,14 @@ #include #include #include +#include #include #include "common.h" namespace moonfire_nvr { -// Represents an open file. All methods but Close() are thread-safe. +// Represents an open file descriptor. All methods but Close() are thread-safe. class File { public: // Close the file, ignoring the result. @@ -61,11 +62,10 @@ class File { // Already closed is considered a success. virtual int Close() = 0; - // fsync(), returning 0 on success or errno>0 on failure. - virtual int Sync() = 0; - - // ftruncate(), returning 0 on success or errno>0 on failure. - virtual int Truncate(off_t length) = 0; + // openat(), returning 0 on success or errno>0 on failure. + virtual int Open(const char *path, int flags, std::unique_ptr *f) = 0; + virtual int Open(const char *path, int flags, mode_t mode, + std::unique_ptr *f) = 0; // read(), returning 0 on success or errno>0 on failure. // On success, |bytes_read| will be updated. @@ -74,11 +74,48 @@ class File { // fstat(), returning 0 on success or errno>0 on failure. virtual int Stat(struct stat *buf) = 0; + // fsync(), returning 0 on success or errno>0 on failure. + virtual int Sync() = 0; + + // ftruncate(), returning 0 on success or errno>0 on failure. + virtual int Truncate(off_t length) = 0; + // Write to the file, returning 0 on success or errno>0 on failure. // On success, |bytes_written| will be updated. virtual int Write(re2::StringPiece data, size_t *bytes_written) = 0; }; +class MockFile : public File { + public: + MOCK_METHOD0(Close, int()); + + // Open is wrapped here because gmock's SetArgPointee doesn't work well with + // std::unique_ptr. + + int Open(const char *path, int flags, std::unique_ptr *f) final { + File *f_tmp = nullptr; + int ret = OpenRaw(path, flags, &f_tmp); + f->reset(f_tmp); + return ret; + } + + int Open(const char *path, int flags, mode_t mode, + std::unique_ptr *f) final { + File *f_tmp = nullptr; + int ret = OpenRaw(path, flags, mode, &f_tmp); + f->reset(f_tmp); + return ret; + } + + MOCK_METHOD3(OpenRaw, int(const char *, int, File **)); + MOCK_METHOD4(OpenRaw, int(const char *, int, mode_t, File **)); + MOCK_METHOD3(Read, int(void *, size_t, size_t *)); + MOCK_METHOD1(Stat, int(struct stat *)); + MOCK_METHOD0(Sync, int()); + MOCK_METHOD2(Write, int(re2::StringPiece, size_t *)); + MOCK_METHOD1(Truncate, int(off_t)); +}; + // Interface to the local filesystem. There's typically one per program, // but it's an abstract class for testability. Thread-safe. class Filesystem { diff --git a/src/recording-test.cc b/src/recording-test.cc index b812630..f8f5cfe 100644 --- a/src/recording-test.cc +++ b/src/recording-test.cc @@ -30,6 +30,10 @@ // // recording-test.cc: tests of the recording.h interface. +#include +#include +#include + #include #include #include @@ -39,7 +43,11 @@ DECLARE_bool(alsologtostderr); +using testing::_; using testing::HasSubstr; +using testing::DoAll; +using testing::Return; +using testing::SetArgPointee; namespace moonfire_nvr { namespace { @@ -119,6 +127,123 @@ TEST(SampleIndexTest, IteratorErrors) { EXPECT_THAT(it.error(), HasSubstr("non-positive bytes")); } +TEST(SampleFileWriterTest, Simple) { + testing::StrictMock parent; + auto *f = new testing::StrictMock; + + re2::StringPiece write_1("write 1"); + re2::StringPiece write_2("write 2"); + + EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _)) + .WillOnce(DoAll(SetArgPointee<3>(f), Return(0))); + EXPECT_CALL(*f, Write(write_1, _)) + .WillOnce(DoAll(SetArgPointee<1>(7), Return(0))); + EXPECT_CALL(*f, Write(write_2, _)) + .WillOnce(DoAll(SetArgPointee<1>(7), Return(0))); + EXPECT_CALL(*f, Sync()).WillOnce(Return(0)); + EXPECT_CALL(*f, Close()).WillOnce(Return(0)); + + SampleFileWriter writer(&parent); + std::string error_message; + std::string sha1; + ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message; + EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message; + EXPECT_TRUE(writer.Write(write_2, &error_message)) << error_message; + EXPECT_TRUE(writer.Close(&sha1, &error_message)) << error_message; + EXPECT_EQ("6b c3 73 25 b3 6f b5 fd 20 5e 57 28 44 29 e7 57 64 33 86 18", + ToHex(sha1)); +} + +TEST(SampleFileWriterTest, PartialWriteIsRetried) { + testing::StrictMock parent; + auto *f = new testing::StrictMock; + + re2::StringPiece write_1("write 1"); + re2::StringPiece write_2("write 2"); + re2::StringPiece write_2b(write_2); + write_2b.remove_prefix(3); + + EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _)) + .WillOnce(DoAll(SetArgPointee<3>(f), Return(0))); + EXPECT_CALL(*f, Write(write_1, _)) + .WillOnce(DoAll(SetArgPointee<1>(7), Return(0))); + EXPECT_CALL(*f, Write(write_2, _)) + .WillOnce(DoAll(SetArgPointee<1>(3), Return(0))); + EXPECT_CALL(*f, Write(write_2b, _)) + .WillOnce(DoAll(SetArgPointee<1>(4), Return(0))); + EXPECT_CALL(*f, Sync()).WillOnce(Return(0)); + EXPECT_CALL(*f, Close()).WillOnce(Return(0)); + + SampleFileWriter writer(&parent); + std::string error_message; + std::string sha1; + ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message; + EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message; + EXPECT_TRUE(writer.Write(write_2, &error_message)) << error_message; + EXPECT_TRUE(writer.Close(&sha1, &error_message)) << error_message; + EXPECT_EQ("6b c3 73 25 b3 6f b5 fd 20 5e 57 28 44 29 e7 57 64 33 86 18", + ToHex(sha1)); +} + +TEST(SampleFileWriterTest, PartialWriteIsTruncated) { + testing::StrictMock parent; + auto *f = new testing::StrictMock; + + re2::StringPiece write_1("write 1"); + re2::StringPiece write_2("write 2"); + re2::StringPiece write_2b(write_2); + write_2b.remove_prefix(3); + + EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _)) + .WillOnce(DoAll(SetArgPointee<3>(f), Return(0))); + EXPECT_CALL(*f, Write(write_1, _)) + .WillOnce(DoAll(SetArgPointee<1>(7), Return(0))); + EXPECT_CALL(*f, Write(write_2, _)) + .WillOnce(DoAll(SetArgPointee<1>(3), Return(0))); + EXPECT_CALL(*f, Write(write_2b, _)).WillOnce(Return(ENOSPC)); + EXPECT_CALL(*f, Truncate(7)).WillOnce(Return(0)); + EXPECT_CALL(*f, Sync()).WillOnce(Return(0)); + EXPECT_CALL(*f, Close()).WillOnce(Return(0)); + + SampleFileWriter writer(&parent); + std::string error_message; + std::string sha1; + ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message; + EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message; + EXPECT_FALSE(writer.Write(write_2, &error_message)) << error_message; + EXPECT_TRUE(writer.Close(&sha1, &error_message)) << error_message; + EXPECT_EQ("b1 cc ee 33 9b 93 55 87 c0 99 97 a9 ec 8b b2 37 4e 02 b5 d0", + ToHex(sha1)); +} + +TEST(SampleFileWriterTest, PartialWriteTruncateFailureCausesCloseToFail) { + testing::StrictMock parent; + auto *f = new testing::StrictMock; + + re2::StringPiece write_1("write 1"); + re2::StringPiece write_2("write 2"); + re2::StringPiece write_2b(write_2); + write_2b.remove_prefix(3); + + EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _)) + .WillOnce(DoAll(SetArgPointee<3>(f), Return(0))); + EXPECT_CALL(*f, Write(write_1, _)) + .WillOnce(DoAll(SetArgPointee<1>(7), Return(0))); + EXPECT_CALL(*f, Write(write_2, _)) + .WillOnce(DoAll(SetArgPointee<1>(3), Return(0))); + EXPECT_CALL(*f, Write(write_2b, _)).WillOnce(Return(EIO)); + EXPECT_CALL(*f, Truncate(7)).WillOnce(Return(EIO)); + EXPECT_CALL(*f, Close()).WillOnce(Return(0)); + + SampleFileWriter writer(&parent); + std::string error_message; + std::string sha1; + ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message; + EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message; + EXPECT_FALSE(writer.Write(write_2, &error_message)) << error_message; + EXPECT_FALSE(writer.Close(&sha1, &error_message)) << error_message; +} + } // namespace } // namespace moonfire_nvr diff --git a/src/recording.cc b/src/recording.cc index eca3136..1cfb2a9 100644 --- a/src/recording.cc +++ b/src/recording.cc @@ -32,6 +32,10 @@ #include "recording.h" +#include +#include +#include + #include "coding.h" #include "string.h" @@ -117,4 +121,82 @@ void SampleIndexIterator::Clear() { done_ = true; } +SampleFileWriter::SampleFileWriter(File *parent_dir) + : parent_dir_(parent_dir), sha1_(Digest::SHA1()) {} + +bool SampleFileWriter::Open(const char *filename, std::string *error_message) { + if (is_open()) { + *error_message = "already open!"; + return false; + } + int ret = + parent_dir_->Open(filename, O_WRONLY | O_CREAT | O_EXCL, 0600, &file_); + if (ret != 0) { + *error_message = StrCat("open ", filename, ": ", strerror(ret)); + return false; + } + return true; +} + +bool SampleFileWriter::Write(re2::StringPiece pkt, std::string *error_message) { + if (!is_open()) { + *error_message = "not open!"; + return false; + } + auto old_pos = pos_; + re2::StringPiece remaining(pkt); + while (!remaining.empty()) { + size_t written; + int write_ret = file_->Write(remaining, &written); + if (write_ret != 0) { + if (pos_ > old_pos) { + int truncate_ret = file_->Truncate(old_pos); + if (truncate_ret != 0) { + *error_message = + StrCat("write failed with: ", strerror(write_ret), + " and ftruncate failed with: ", strerror(truncate_ret)); + corrupt_ = true; + return false; + } + } + *error_message = StrCat("write: ", strerror(write_ret)); + return false; + } + remaining.remove_prefix(written); + pos_ += written; + } + sha1_->Update(pkt); + return true; +} + +bool SampleFileWriter::Close(std::string *sha1, std::string *error_message) { + if (!is_open()) { + *error_message = "not open!"; + return false; + } + + if (corrupt_) { + *error_message = "File already corrupted."; + } else { + int ret = file_->Sync(); + if (ret != 0) { + *error_message = StrCat("fsync failed with: ", strerror(ret)); + corrupt_ = true; + } + } + + int ret = file_->Close(); + if (ret != 0 && !corrupt_) { + corrupt_ = true; + *error_message = StrCat("close failed with: ", strerror(ret)); + } + + bool ok = !corrupt_; + file_.reset(); + *sha1 = sha1_->Finalize(); + pos_ = 0; + corrupt_ = false; + return ok; +} + } // namespace moonfire_nvr diff --git a/src/recording.h b/src/recording.h index 4d401a8..2d2cb75 100644 --- a/src/recording.h +++ b/src/recording.h @@ -36,11 +36,15 @@ #include +#include #include #include #include +#include "crypto.h" +#include "filesystem.h" + namespace moonfire_nvr { // Encodes a sample index. @@ -125,6 +129,44 @@ class SampleIndexIterator { bool done_; }; +// Writes a sample file. Can be used repeatedly. Thread-compatible. +class SampleFileWriter { + public: + // |parent_dir| must outlive the writer. + SampleFileWriter(File *parent_dir); + SampleFileWriter(const SampleFileWriter &) = delete; + void operator=(const SampleFileWriter &) = delete; + + // PRE: !is_open(). + bool Open(const char *filename, std::string *error_message); + + // Writes a single packet, returning success. + // On failure, the stream should be closed. If Close() returns true, the + // file contains the results of all packets up to (but not including) this + // one. + // + // PRE: is_open(). + bool Write(re2::StringPiece pkt, std::string *error_message); + + // fsync() and close() the stream. + // Note the caller is still responsible for fsync()ing the parent stream, + // so that operations can be batched. + // On success, |sha1| will be filled with the raw SHA-1 hash of the file. + // On failure, the file should be considered corrupt and discarded. + // + // PRE: is_open(). + bool Close(std::string *sha1, std::string *error_message); + + bool is_open() const { return file_ != nullptr; } + + private: + File *parent_dir_; + std::unique_ptr file_; + std::unique_ptr sha1_; + int64_t pos_ = 0; + bool corrupt_ = false; +}; + } // namespace moonfire_nvr #endif // MOONFIRE_NVR_RECORDING_H