return w;
}
Status ClearWriter() {
- // TODO: plumb Env::IOActivity, Env::IOPriority
- Status s = writer->WriteBuffer(WriteOptions());
+ Status s;
+ if (writer->file()) {
+ // TODO: plumb Env::IOActivity, Env::IOPriority
+ s = writer->WriteBuffer(WriteOptions());
+ }
delete writer;
writer = nullptr;
return s;
void PrepareForSync() {
assert(!getting_synced);
- // Size is expected to be monotonically increasing.
- assert(writer->file()->GetFlushedSize() >= pre_sync_size);
+ // Ensure the head of logs_ is marked as getting_synced if any is.
getting_synced = true;
- pre_sync_size = writer->file()->GetFlushedSize();
+ // If last sync failed on a later WAL, this could be a fully synced
+ // and closed WAL that just needs to be recorded as synced in the
+ // manifest.
+ if (writer->file()) {
+ // Size is expected to be monotonically increasing.
+ assert(writer->file()->GetFlushedSize() >= pre_sync_size);
+ pre_sync_size = writer->file()->GetFlushedSize();
+ }
}
void FinishSync() {
#include "port/stack_trace.h"
#include "rocksdb/file_system.h"
#include "test_util/sync_point.h"
+#include "util/defer.h"
#include "util/udt_util.h"
#include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h"
ASSERT_OK(dbfull()->SyncWAL());
}
+TEST_F(DBWALTest, SyncWalPartialFailure) {
+ class MyTestFileSystem : public FileSystemWrapper {
+ public:
+ explicit MyTestFileSystem(std::shared_ptr<FileSystem> base)
+ : FileSystemWrapper(std::move(base)) {}
+
+ static const char* kClassName() { return "MyTestFileSystem"; }
+ const char* Name() const override { return kClassName(); }
+ IOStatus NewWritableFile(const std::string& fname,
+ const FileOptions& file_opts,
+ std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* dbg) override {
+ IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
+ if (s.ok()) {
+ *result =
+ std::make_unique<MyTestWritableFile>(std::move(*result), *this);
+ }
+ return s;
+ }
+
+ AcqRelAtomic<uint32_t> syncs_before_failure_{UINT32_MAX};
+
+ protected:
+ class MyTestWritableFile : public FSWritableFileOwnerWrapper {
+ public:
+ MyTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
+ MyTestFileSystem& fs)
+ : FSWritableFileOwnerWrapper(std::move(file)), fs_(fs) {}
+
+ IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+ int prev_val = fs_.syncs_before_failure_.FetchSub(1);
+ if (prev_val == 0) {
+ return IOStatus::IOError("fault");
+ } else {
+ return target()->Sync(options, dbg);
+ }
+ }
+
+ protected:
+ MyTestFileSystem& fs_;
+ };
+ };
+
+ Options options = CurrentOptions();
+ options.max_write_buffer_number = 4;
+ options.track_and_verify_wals_in_manifest = true;
+ options.max_bgerror_resume_count = 0; // manual resume
+
+ auto custom_fs =
+ std::make_shared<MyTestFileSystem>(options.env->GetFileSystem());
+ std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(custom_fs));
+ options.env = fault_fs_env.get();
+ Reopen(options);
+ Defer closer([this]() { Close(); });
+
+ // This is the simplest way to get
+ // * one inactive WAL, synced
+ // * one inactive WAL, not synced, and
+ // * one active WAL, not synced
+ // with a single thread, to exercise as much logic as we reasonably can.
+ ASSERT_OK(static_cast_with_check<DBImpl>(db_)->PauseBackgroundWork());
+ ASSERT_OK(Put("key1", "val1"));
+ ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
+ ASSERT_OK(db_->SyncWAL());
+ ASSERT_OK(Put("key2", "val2"));
+ ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
+ ASSERT_OK(Put("key3", "val3"));
+
+ // Allow 1 of the WALs to sync, but another won't
+ custom_fs->syncs_before_failure_.Store(1);
+ ASSERT_NOK(db_->SyncWAL());
+
+ // Stuck in this state. (This could previously cause a segfault.)
+ ASSERT_NOK(db_->SyncWAL());
+
+ // Can't Resume because WAL write failure is considered non-recoverable,
+ // regardless of the IOStatus itself. (Can/should be fixed?)
+ ASSERT_NOK(db_->Resume());
+
+ // Verify no data loss after reopen.
+ // Also Close() could previously crash in this state.
+ Reopen(options);
+ ASSERT_EQ("val1", Get("key1"));
+ ASSERT_EQ("val2", Get("key2"));
+ ASSERT_EQ("val3", Get("key3"));
+}
+
// Github issue 1339. Prior the fix we read sequence id from the first log to
// a local variable, then keep increase the variable as we replay logs,
// ignoring actual sequence id of the records. This is incorrect if some writes