Small helper for writing sample files safely.

Handles partial writes + checksumming.
This commit is contained in:
Scott Lamb 2016-01-09 17:16:55 -08:00
parent 30e0f73ae0
commit 48d0473a4c
5 changed files with 306 additions and 6 deletions

View File

@ -79,6 +79,20 @@ class RealFile : public File {
return 0;
}
int Open(const char *path, int flags, std::unique_ptr<File> *f) final {
return Open(path, flags, 0, f);
}
int Open(const char *path, int flags, mode_t mode,
std::unique_ptr<File> *f) final {
int ret = openat(fd_, path, flags, mode);
if (ret < 0) {
return errno;
}
f->reset(new RealFile(ret));
return 0;
}
int Read(void *buf, size_t size, size_t *bytes_read) final {
ssize_t ret;
while ((ret = read(fd_, buf, size)) == -1 && errno == EINTR)

View File

@ -45,13 +45,14 @@
#include <event2/buffer.h>
#include <event2/http.h>
#include <glog/logging.h>
#include <gmock/gmock.h>
#include <re2/stringpiece.h>
#include "common.h"
namespace moonfire_nvr {
// Represents an open file. All methods but Close() are thread-safe.
// Represents an open file descriptor. All methods but Close() are thread-safe.
class File {
public:
// Close the file, ignoring the result.
@ -61,11 +62,10 @@ class File {
// Already closed is considered a success.
virtual int Close() = 0;
// fsync(), returning 0 on success or errno>0 on failure.
virtual int Sync() = 0;
// ftruncate(), returning 0 on success or errno>0 on failure.
virtual int Truncate(off_t length) = 0;
// openat(), returning 0 on success or errno>0 on failure.
virtual int Open(const char *path, int flags, std::unique_ptr<File> *f) = 0;
virtual int Open(const char *path, int flags, mode_t mode,
std::unique_ptr<File> *f) = 0;
// read(), returning 0 on success or errno>0 on failure.
// On success, |bytes_read| will be updated.
@ -74,11 +74,48 @@ class File {
// fstat(), returning 0 on success or errno>0 on failure.
virtual int Stat(struct stat *buf) = 0;
// fsync(), returning 0 on success or errno>0 on failure.
virtual int Sync() = 0;
// ftruncate(), returning 0 on success or errno>0 on failure.
virtual int Truncate(off_t length) = 0;
// Write to the file, returning 0 on success or errno>0 on failure.
// On success, |bytes_written| will be updated.
virtual int Write(re2::StringPiece data, size_t *bytes_written) = 0;
};
class MockFile : public File {
public:
MOCK_METHOD0(Close, int());
// Open is wrapped here because gmock's SetArgPointee doesn't work well with
// std::unique_ptr.
int Open(const char *path, int flags, std::unique_ptr<File> *f) final {
File *f_tmp = nullptr;
int ret = OpenRaw(path, flags, &f_tmp);
f->reset(f_tmp);
return ret;
}
int Open(const char *path, int flags, mode_t mode,
std::unique_ptr<File> *f) final {
File *f_tmp = nullptr;
int ret = OpenRaw(path, flags, mode, &f_tmp);
f->reset(f_tmp);
return ret;
}
MOCK_METHOD3(OpenRaw, int(const char *, int, File **));
MOCK_METHOD4(OpenRaw, int(const char *, int, mode_t, File **));
MOCK_METHOD3(Read, int(void *, size_t, size_t *));
MOCK_METHOD1(Stat, int(struct stat *));
MOCK_METHOD0(Sync, int());
MOCK_METHOD2(Write, int(re2::StringPiece, size_t *));
MOCK_METHOD1(Truncate, int(off_t));
};
// Interface to the local filesystem. There's typically one per program,
// but it's an abstract class for testability. Thread-safe.
class Filesystem {

View File

@ -30,6 +30,10 @@
//
// recording-test.cc: tests of the recording.h interface.
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <gflags/gflags.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@ -39,7 +43,11 @@
DECLARE_bool(alsologtostderr);
using testing::_;
using testing::HasSubstr;
using testing::DoAll;
using testing::Return;
using testing::SetArgPointee;
namespace moonfire_nvr {
namespace {
@ -119,6 +127,123 @@ TEST(SampleIndexTest, IteratorErrors) {
EXPECT_THAT(it.error(), HasSubstr("non-positive bytes"));
}
TEST(SampleFileWriterTest, Simple) {
testing::StrictMock<MockFile> parent;
auto *f = new testing::StrictMock<MockFile>;
re2::StringPiece write_1("write 1");
re2::StringPiece write_2("write 2");
EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _))
.WillOnce(DoAll(SetArgPointee<3>(f), Return(0)));
EXPECT_CALL(*f, Write(write_1, _))
.WillOnce(DoAll(SetArgPointee<1>(7), Return(0)));
EXPECT_CALL(*f, Write(write_2, _))
.WillOnce(DoAll(SetArgPointee<1>(7), Return(0)));
EXPECT_CALL(*f, Sync()).WillOnce(Return(0));
EXPECT_CALL(*f, Close()).WillOnce(Return(0));
SampleFileWriter writer(&parent);
std::string error_message;
std::string sha1;
ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message;
EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message;
EXPECT_TRUE(writer.Write(write_2, &error_message)) << error_message;
EXPECT_TRUE(writer.Close(&sha1, &error_message)) << error_message;
EXPECT_EQ("6b c3 73 25 b3 6f b5 fd 20 5e 57 28 44 29 e7 57 64 33 86 18",
ToHex(sha1));
}
TEST(SampleFileWriterTest, PartialWriteIsRetried) {
testing::StrictMock<MockFile> parent;
auto *f = new testing::StrictMock<MockFile>;
re2::StringPiece write_1("write 1");
re2::StringPiece write_2("write 2");
re2::StringPiece write_2b(write_2);
write_2b.remove_prefix(3);
EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _))
.WillOnce(DoAll(SetArgPointee<3>(f), Return(0)));
EXPECT_CALL(*f, Write(write_1, _))
.WillOnce(DoAll(SetArgPointee<1>(7), Return(0)));
EXPECT_CALL(*f, Write(write_2, _))
.WillOnce(DoAll(SetArgPointee<1>(3), Return(0)));
EXPECT_CALL(*f, Write(write_2b, _))
.WillOnce(DoAll(SetArgPointee<1>(4), Return(0)));
EXPECT_CALL(*f, Sync()).WillOnce(Return(0));
EXPECT_CALL(*f, Close()).WillOnce(Return(0));
SampleFileWriter writer(&parent);
std::string error_message;
std::string sha1;
ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message;
EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message;
EXPECT_TRUE(writer.Write(write_2, &error_message)) << error_message;
EXPECT_TRUE(writer.Close(&sha1, &error_message)) << error_message;
EXPECT_EQ("6b c3 73 25 b3 6f b5 fd 20 5e 57 28 44 29 e7 57 64 33 86 18",
ToHex(sha1));
}
TEST(SampleFileWriterTest, PartialWriteIsTruncated) {
testing::StrictMock<MockFile> parent;
auto *f = new testing::StrictMock<MockFile>;
re2::StringPiece write_1("write 1");
re2::StringPiece write_2("write 2");
re2::StringPiece write_2b(write_2);
write_2b.remove_prefix(3);
EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _))
.WillOnce(DoAll(SetArgPointee<3>(f), Return(0)));
EXPECT_CALL(*f, Write(write_1, _))
.WillOnce(DoAll(SetArgPointee<1>(7), Return(0)));
EXPECT_CALL(*f, Write(write_2, _))
.WillOnce(DoAll(SetArgPointee<1>(3), Return(0)));
EXPECT_CALL(*f, Write(write_2b, _)).WillOnce(Return(ENOSPC));
EXPECT_CALL(*f, Truncate(7)).WillOnce(Return(0));
EXPECT_CALL(*f, Sync()).WillOnce(Return(0));
EXPECT_CALL(*f, Close()).WillOnce(Return(0));
SampleFileWriter writer(&parent);
std::string error_message;
std::string sha1;
ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message;
EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message;
EXPECT_FALSE(writer.Write(write_2, &error_message)) << error_message;
EXPECT_TRUE(writer.Close(&sha1, &error_message)) << error_message;
EXPECT_EQ("b1 cc ee 33 9b 93 55 87 c0 99 97 a9 ec 8b b2 37 4e 02 b5 d0",
ToHex(sha1));
}
TEST(SampleFileWriterTest, PartialWriteTruncateFailureCausesCloseToFail) {
testing::StrictMock<MockFile> parent;
auto *f = new testing::StrictMock<MockFile>;
re2::StringPiece write_1("write 1");
re2::StringPiece write_2("write 2");
re2::StringPiece write_2b(write_2);
write_2b.remove_prefix(3);
EXPECT_CALL(parent, OpenRaw("foo", O_WRONLY | O_EXCL | O_CREAT, 0600, _))
.WillOnce(DoAll(SetArgPointee<3>(f), Return(0)));
EXPECT_CALL(*f, Write(write_1, _))
.WillOnce(DoAll(SetArgPointee<1>(7), Return(0)));
EXPECT_CALL(*f, Write(write_2, _))
.WillOnce(DoAll(SetArgPointee<1>(3), Return(0)));
EXPECT_CALL(*f, Write(write_2b, _)).WillOnce(Return(EIO));
EXPECT_CALL(*f, Truncate(7)).WillOnce(Return(EIO));
EXPECT_CALL(*f, Close()).WillOnce(Return(0));
SampleFileWriter writer(&parent);
std::string error_message;
std::string sha1;
ASSERT_TRUE(writer.Open("foo", &error_message)) << error_message;
EXPECT_TRUE(writer.Write(write_1, &error_message)) << error_message;
EXPECT_FALSE(writer.Write(write_2, &error_message)) << error_message;
EXPECT_FALSE(writer.Close(&sha1, &error_message)) << error_message;
}
} // namespace
} // namespace moonfire_nvr

View File

@ -32,6 +32,10 @@
#include "recording.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "coding.h"
#include "string.h"
@ -117,4 +121,82 @@ void SampleIndexIterator::Clear() {
done_ = true;
}
SampleFileWriter::SampleFileWriter(File *parent_dir)
: parent_dir_(parent_dir), sha1_(Digest::SHA1()) {}
bool SampleFileWriter::Open(const char *filename, std::string *error_message) {
if (is_open()) {
*error_message = "already open!";
return false;
}
int ret =
parent_dir_->Open(filename, O_WRONLY | O_CREAT | O_EXCL, 0600, &file_);
if (ret != 0) {
*error_message = StrCat("open ", filename, ": ", strerror(ret));
return false;
}
return true;
}
bool SampleFileWriter::Write(re2::StringPiece pkt, std::string *error_message) {
if (!is_open()) {
*error_message = "not open!";
return false;
}
auto old_pos = pos_;
re2::StringPiece remaining(pkt);
while (!remaining.empty()) {
size_t written;
int write_ret = file_->Write(remaining, &written);
if (write_ret != 0) {
if (pos_ > old_pos) {
int truncate_ret = file_->Truncate(old_pos);
if (truncate_ret != 0) {
*error_message =
StrCat("write failed with: ", strerror(write_ret),
" and ftruncate failed with: ", strerror(truncate_ret));
corrupt_ = true;
return false;
}
}
*error_message = StrCat("write: ", strerror(write_ret));
return false;
}
remaining.remove_prefix(written);
pos_ += written;
}
sha1_->Update(pkt);
return true;
}
bool SampleFileWriter::Close(std::string *sha1, std::string *error_message) {
if (!is_open()) {
*error_message = "not open!";
return false;
}
if (corrupt_) {
*error_message = "File already corrupted.";
} else {
int ret = file_->Sync();
if (ret != 0) {
*error_message = StrCat("fsync failed with: ", strerror(ret));
corrupt_ = true;
}
}
int ret = file_->Close();
if (ret != 0 && !corrupt_) {
corrupt_ = true;
*error_message = StrCat("close failed with: ", strerror(ret));
}
bool ok = !corrupt_;
file_.reset();
*sha1 = sha1_->Finalize();
pos_ = 0;
corrupt_ = false;
return ok;
}
} // namespace moonfire_nvr

View File

@ -36,11 +36,15 @@
#include <stdint.h>
#include <memory>
#include <string>
#include <glog/logging.h>
#include <re2/stringpiece.h>
#include "crypto.h"
#include "filesystem.h"
namespace moonfire_nvr {
// Encodes a sample index.
@ -125,6 +129,44 @@ class SampleIndexIterator {
bool done_;
};
// Writes a sample file. Can be used repeatedly. Thread-compatible.
class SampleFileWriter {
public:
// |parent_dir| must outlive the writer.
SampleFileWriter(File *parent_dir);
SampleFileWriter(const SampleFileWriter &) = delete;
void operator=(const SampleFileWriter &) = delete;
// PRE: !is_open().
bool Open(const char *filename, std::string *error_message);
// Writes a single packet, returning success.
// On failure, the stream should be closed. If Close() returns true, the
// file contains the results of all packets up to (but not including) this
// one.
//
// PRE: is_open().
bool Write(re2::StringPiece pkt, std::string *error_message);
// fsync() and close() the stream.
// Note the caller is still responsible for fsync()ing the parent stream,
// so that operations can be batched.
// On success, |sha1| will be filled with the raw SHA-1 hash of the file.
// On failure, the file should be considered corrupt and discarded.
//
// PRE: is_open().
bool Close(std::string *sha1, std::string *error_message);
bool is_open() const { return file_ != nullptr; }
private:
File *parent_dir_;
std::unique_ptr<File> file_;
std::unique_ptr<Digest> sha1_;
int64_t pos_ = 0;
bool corrupt_ = false;
};
} // namespace moonfire_nvr
#endif // MOONFIRE_NVR_RECORDING_H