info.file_number = live_wal_files[i]->LogNumber();
info.file_type = kWalFile;
info.size = live_wal_files[i]->SizeFileBytes();
- // Only last should need to be trimmed
- info.trim_to_size = (i + 1 == wal_size);
+ // Trim the log either if its the last one, or log file recycling is
+ // enabled. In the latter case, a hard link doesn't prevent the file
+ // from being renamed and recycled. So we need to copy it instead.
+ info.trim_to_size = (i + 1 == wal_size) ||
+ (immutable_db_options_.recycle_log_file_num > 0);
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
- if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
+ // Check if the file has been closed, i.e wal.writer->file() == nullptr
+ // which can happen if log recycling is enabled, or if all the data in
+ // the log has been synced
+ if (wal.writer->file() == nullptr ||
+ wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
// Fully synced
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
- // TODO: plumb Env::IOActivity, Env::IOPriority
- io_s = log->Close(WriteOptions());
+ // Normally the log file is closed when purging obsolete file, but if
+ // log recycling is enabled, the log file is closed here so that it
+ // can be reused.
+ io_s = log->Close(write_options);
if (!io_s.ok()) {
break;
}
if (result.recycle_log_file_num &&
(result.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords ||
- result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
// - kTolerateCorruptedTailRecords is inconsistent with recycle log file
// feature. WAL recycling expects recovery success upon encountering a
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
+ bool* old_log_record;
void Corruption(size_t bytes, const Status& s) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(status == nullptr ? "(ignoring error) " : ""), fname,
*status = s;
}
}
+
+ void OldLogRecord(size_t bytes) override {
+ if (old_log_record != nullptr) {
+ *old_log_record = true;
+ }
+ ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled",
+ fname, static_cast<int>(bytes));
+ }
};
mutex_.AssertHeld();
Status status;
+ bool old_log_record = false;
std::unordered_map<int, VersionEdit> version_edits;
// no need to refcount because iteration is under mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
reporter.env = env_;
reporter.info_log = immutable_db_options_.info_log.get();
reporter.fname = fname.c_str();
+ reporter.old_log_record = &old_log_record;
if (!immutable_db_options_.paranoid_checks ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kSkipAnyCorruptedRecords) {
}
}
- if (!status.ok()) {
+ if (!status.ok() || old_log_record) {
if (status.IsNotSupported()) {
// We should not treat NotSupported as corruption. It is rather a clear
// sign that we are processing a WAL that is produced by an incompatible
}
// We should ignore the error but not continue replaying
status = Status::OK();
+ old_log_record = false;
stop_replay_for_corruption = true;
corrupted_wal_number = wal_number;
if (corrupted_wal_found != nullptr) {
write_options.protection_bytes_per_key != 8) {
return Status::InvalidArgument(
"`WriteOptions::protection_bytes_per_key` must be zero or eight");
+ } else if (write_options.disableWAL &&
+ immutable_db_options_.recycle_log_file_num > 0) {
+ return Status::InvalidArgument(
+ "WriteOptions::disableWAL option is not supported if "
+ "DBOptions::recycle_log_file_num > 0");
}
// TODO: this use of operator bool on `tracer_` can avoid unnecessary lock
// grabs but does not seem thread-safe.
log_write_mutex_.Unlock();
}
uint64_t recycle_log_number = 0;
+ // If file deletion is disabled, don't recycle logs since it'll result in
+ // the file getting renamed
if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
- !log_recycle_files_.empty()) {
+ !log_recycle_files_.empty() && IsFileDeletionsEnabled()) {
recycle_log_number = log_recycle_files_.front();
}
uint64_t new_log_number =
}
#endif // !(defined NDEBUG) || !defined(OS_WIN)
-TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) {
- // TODO(ajkr): Disabled until WAL recycling is fixed for
- // `kPointInTimeRecovery`.
-
+TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
// For github issue #1303
for (int i = 0; i < 2; ++i) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.recycle_log_file_num = 2;
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
if (i != 0) {
options.wal_dir = alternative_wal_dir_;
}
}
}
-TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) {
- // TODO(ajkr): Disabled until WAL recycling is fixed for
- // `kPointInTimeRecovery`.
-
+TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
// Ensures full purge cannot delete a WAL while it's in the process of being
// recycled. In particular, we force the full purge after a file has been
// chosen for reuse, but before it has been renamed.
for (int i = 0; i < 2; ++i) {
Options options = CurrentOptions();
options.recycle_log_file_num = 1;
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
if (i != 0) {
options.wal_dir = alternative_wal_dir_;
}
ASSERT_LE(bytes_num, 1024 * 100);
}
+void CorruptLogFile(Env* env, Options& options, std::string log_path,
+ uint64_t log_num, int record_num) {
+ std::shared_ptr<FileSystem> fs = env->GetFileSystem();
+ std::unique_ptr<SequentialFileReader> file_reader;
+ Status status;
+ {
+ std::unique_ptr<FSSequentialFile> file;
+ status = fs->NewSequentialFile(log_path, FileOptions(), &file, nullptr);
+ ASSERT_EQ(status, IOStatus::OK());
+ file_reader.reset(new SequentialFileReader(std::move(file), log_path));
+ }
+ std::unique_ptr<log::Reader> reader(new log::Reader(
+ nullptr, std::move(file_reader), nullptr, false, log_num));
+ std::string scratch;
+ Slice record;
+ uint64_t record_checksum;
+ for (int i = 0; i < record_num; ++i) {
+ ASSERT_TRUE(reader->ReadRecord(&record, &scratch, options.wal_recovery_mode,
+ &record_checksum));
+ }
+ uint64_t rec_start = reader->LastRecordOffset();
+ reader.reset();
+ {
+ std::unique_ptr<FSRandomRWFile> file;
+ status = fs->NewRandomRWFile(log_path, FileOptions(), &file, nullptr);
+ ASSERT_EQ(status, IOStatus::OK());
+ uint32_t bad_lognum = 0xff;
+ ASSERT_EQ(file->Write(
+ rec_start + 7,
+ Slice(reinterpret_cast<char*>(&bad_lognum), sizeof(uint32_t)),
+ IOOptions(), nullptr),
+ IOStatus::OK());
+ ASSERT_OK(file->Close(IOOptions(), nullptr));
+ file.reset();
+ }
+}
+
+TEST_P(DBWriteTest, RecycleLogTest) {
+ Options options = GetOptions();
+ options.recycle_log_file_num = 0;
+ options.avoid_flush_during_recovery = true;
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+
+ Reopen(options);
+ ASSERT_OK(Put(Key(1), "val1"));
+ ASSERT_OK(Put(Key(2), "val1"));
+
+ uint64_t latest_log_num = 0;
+ std::unique_ptr<LogFile> log_file;
+ ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
+ latest_log_num = log_file->LogNumber();
+ Reopen(options);
+ ASSERT_OK(Put(Key(3), "val3"));
+
+ // Corrupt second entry of first log
+ std::string log_path = LogFileName(dbname_, latest_log_num);
+ CorruptLogFile(env_, options, log_path, latest_log_num, 2);
+
+ Reopen(options);
+ ASSERT_EQ(Get(Key(1)), "val1");
+ ASSERT_EQ(Get(Key(2)), "NOT_FOUND");
+ ASSERT_EQ(Get(Key(3)), "NOT_FOUND");
+}
+
+TEST_P(DBWriteTest, RecycleLogTestCFAheadOfWAL) {
+ Options options = GetOptions();
+ options.recycle_log_file_num = 0;
+ options.avoid_flush_during_recovery = true;
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+
+ CreateAndReopenWithCF({"pikachu"}, options);
+ ASSERT_OK(Put(1, Key(1), "val1"));
+ ASSERT_OK(Put(0, Key(2), "val2"));
+
+ uint64_t latest_log_num = 0;
+ std::unique_ptr<LogFile> log_file;
+ ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
+ latest_log_num = log_file->LogNumber();
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(Put(1, Key(3), "val3"));
+
+ // Corrupt second entry of first log
+ std::string log_path = LogFileName(dbname_, latest_log_num);
+ CorruptLogFile(env_, options, log_path, latest_log_num, 2);
+
+ ASSERT_EQ(TryReopenWithColumnFamilies({"default", "pikachu"}, options),
+ Status::Corruption());
+}
+
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
+ } else {
+ if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
+ ReportOldLogRecord(scratch->size());
+ }
}
return false;
}
}
}
+void Reader::ReportOldLogRecord(size_t bytes) {
+ if (reporter_ != nullptr) {
+ reporter_->OldLogRecord(bytes);
+ }
+}
+
bool Reader::ReadMore(size_t* drop_size, int* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
// Some corruption was detected. "size" is the approximate number
// of bytes dropped due to the corruption.
virtual void Corruption(size_t bytes, const Status& status) = 0;
+
+ virtual void OldLogRecord(size_t /*bytes*/) {}
};
// Create a reader that will return log records from "*file".
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
+ void ReportOldLogRecord(size_t bytes);
void InitCompression(const CompressionTypeRecord& compression_record);
# disable atomic flush.
if dest_params["test_best_efforts_recovery"] == 0:
dest_params["disable_wal"] = 0
+ if dest_params.get("disable_wal") == 1:
+ # disableWAL and recycle_log_file_num options are not mutually
+ # compatible at the moment
+ dest_params["recycle_log_file_num"] = 0
return dest_params
--- /dev/null
+Re-enable the recycle_log_file_num option in DBOptions for kPointInTimeRecovery WAL recovery mode, which was previously disabled due to a bug in the recovery logic. This option is incompatible with WriteOptions::disableWAL. A Status::InvalidArgument() will be returned if disableWAL is specified.