]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Enable recycle_log_file_num option for point in time recovery (#12403)
authoranand76 <anand1976@users.noreply.github.com>
Thu, 21 Mar 2024 19:29:35 +0000 (12:29 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 21 Mar 2024 19:29:35 +0000 (12:29 -0700)
Summary:
This option was previously disabled due to a bug in the recovery logic. The recovery code in `DBImpl::RecoverLogFiles` couldn't tell if an EoF reported by the log reader was really an EoF or a possible corruption that made a record look like an old log record. To fix this, the log reader now explicitly reports when it encounters what looks like an old record. The recovery code treats it as a possible corruption, and uses the next sequence number in the WAL to determine if it should continue replaying the WAL.

This PR also fixes a couple of bugs that log file recycling exposed in the backup and checkpoint path.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12403

Test Plan:
1. Add new unit tests to verify behavior upon corruption
2. Re-enable disabled tests for verifying recycling behavior

Reviewed By: ajkr

Differential Revision: D54544824

Pulled By: anand1976

fbshipit-source-id: 12f5ce39bd6bc0d63b0bc6432dc4db510e0e802a

db/db_filesnapshot.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_open.cc
db/db_impl/db_impl_write.cc
db/db_wal_test.cc
db/db_write_test.cc
db/log_reader.cc
db/log_reader.h
tools/db_crashtest.py
unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md [new file with mode: 0644]

index 711c55c9e6b533401b403574158441ea930f2b83..cbc2db14f247be078171f769c0081cc8d0d65ada 100644 (file)
@@ -390,8 +390,11 @@ Status DBImpl::GetLiveFilesStorageInfo(
       info.file_number = live_wal_files[i]->LogNumber();
       info.file_type = kWalFile;
       info.size = live_wal_files[i]->SizeFileBytes();
-      // Only last should need to be trimmed
-      info.trim_to_size = (i + 1 == wal_size);
+      // Trim the log either if its the last one, or log file recycling is
+      // enabled. In the latter case, a hard link doesn't prevent the file
+      // from being renamed and recycled. So we need to copy it instead.
+      info.trim_to_size = (i + 1 == wal_size) ||
+                          (immutable_db_options_.recycle_log_file_num > 0);
       if (opts.include_checksum_info) {
         info.file_checksum_func_name = kUnknownFileChecksumFuncName;
         info.file_checksum = kUnknownFileChecksum;
index ef3ce78b4698fa371cfdd475ff7e8ed3a3113f4b..f7e6f9692b23c1cc6cff8372fcbb1d256e61c671 100644 (file)
@@ -1757,7 +1757,11 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
           wal.GetPreSyncSize() > 0) {
         synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
       }
-      if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
+      // Check if the file has been closed, i.e wal.writer->file() == nullptr
+      // which can happen if log recycling is enabled, or if all the data in
+      // the log has been synced
+      if (wal.writer->file() == nullptr ||
+          wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
         // Fully synced
         logs_to_free_.push_back(wal.ReleaseWriter());
         it = logs_.erase(it);
index 8845a2a4d7e60dbe22abecefce311c157869f528..15b0377d427576121aac9d53ca18a14900ee7713 100644 (file)
@@ -163,8 +163,10 @@ IOStatus DBImpl::SyncClosedLogs(const WriteOptions& write_options,
         if (error_recovery_in_prog) {
           log->file()->reset_seen_error();
         }
-        // TODO: plumb Env::IOActivity, Env::IOPriority
-        io_s = log->Close(WriteOptions());
+        // Normally the log file is closed when purging obsolete file, but if
+        // log recycling is enabled, the log file is closed here so that it
+        // can be reused.
+        io_s = log->Close(write_options);
         if (!io_s.ok()) {
           break;
         }
index d2591b6e92cdb6397fdafa2c5c6df96b05bac327..786abb74f24de6aa05ffa61f6a44d257d0fa35fc 100644 (file)
@@ -104,7 +104,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
   if (result.recycle_log_file_num &&
       (result.wal_recovery_mode ==
            WALRecoveryMode::kTolerateCorruptedTailRecords ||
-       result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
        result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
     // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
     //   feature. WAL recycling expects recovery success upon encountering a
@@ -1086,6 +1085,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
     Logger* info_log;
     const char* fname;
     Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
+    bool* old_log_record;
     void Corruption(size_t bytes, const Status& s) override {
       ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
                      (status == nullptr ? "(ignoring error) " : ""), fname,
@@ -1094,10 +1094,19 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
         *status = s;
       }
     }
+
+    void OldLogRecord(size_t bytes) override {
+      if (old_log_record != nullptr) {
+        *old_log_record = true;
+      }
+      ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled",
+                     fname, static_cast<int>(bytes));
+    }
   };
 
   mutex_.AssertHeld();
   Status status;
+  bool old_log_record = false;
   std::unordered_map<int, VersionEdit> version_edits;
   // no need to refcount because iteration is under mutex
   for (auto cfd : *versions_->GetColumnFamilySet()) {
@@ -1188,6 +1197,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
     reporter.env = env_;
     reporter.info_log = immutable_db_options_.info_log.get();
     reporter.fname = fname.c_str();
+    reporter.old_log_record = &old_log_record;
     if (!immutable_db_options_.paranoid_checks ||
         immutable_db_options_.wal_recovery_mode ==
             WALRecoveryMode::kSkipAnyCorruptedRecords) {
@@ -1335,7 +1345,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
       }
     }
 
-    if (!status.ok()) {
+    if (!status.ok() || old_log_record) {
       if (status.IsNotSupported()) {
         // We should not treat NotSupported as corruption. It is rather a clear
         // sign that we are processing a WAL that is produced by an incompatible
@@ -1360,6 +1370,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
         }
         // We should ignore the error but not continue replaying
         status = Status::OK();
+        old_log_record = false;
         stop_replay_for_corruption = true;
         corrupted_wal_number = wal_number;
         if (corrupted_wal_found != nullptr) {
index ee103a57a9c5a55b56820ba2150cf738e825f1cf..5f18e01d24e81da6c84d024cef0134fd61a8118a 100644 (file)
@@ -220,6 +220,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
              write_options.protection_bytes_per_key != 8) {
     return Status::InvalidArgument(
         "`WriteOptions::protection_bytes_per_key` must be zero or eight");
+  } else if (write_options.disableWAL &&
+             immutable_db_options_.recycle_log_file_num > 0) {
+    return Status::InvalidArgument(
+        "WriteOptions::disableWAL option is not supported if "
+        "DBOptions::recycle_log_file_num > 0");
   }
   // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock
   // grabs but does not seem thread-safe.
@@ -2173,8 +2178,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
     log_write_mutex_.Unlock();
   }
   uint64_t recycle_log_number = 0;
+  // If file deletion is disabled, don't recycle logs since it'll result in
+  // the file getting renamed
   if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
-      !log_recycle_files_.empty()) {
+      !log_recycle_files_.empty() && IsFileDeletionsEnabled()) {
     recycle_log_number = log_recycle_files_.front();
   }
   uint64_t new_log_number =
index 490ab2742dcb89f2150d6ba64fbafcaa345e4453..91070e298b658db03e3d611675b92272f954f834 100644 (file)
@@ -1123,15 +1123,13 @@ TEST_F(DBWALTest, PreallocateBlock) {
 }
 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
 
-TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) {
-  // TODO(ajkr): Disabled until WAL recycling is fixed for
-  // `kPointInTimeRecovery`.
-
+TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
   // For github issue #1303
   for (int i = 0; i < 2; ++i) {
     Options options = CurrentOptions();
     options.create_if_missing = true;
     options.recycle_log_file_num = 2;
+    options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
     if (i != 0) {
       options.wal_dir = alternative_wal_dir_;
     }
@@ -1162,16 +1160,14 @@ TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) {
   }
 }
 
-TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) {
-  // TODO(ajkr): Disabled until WAL recycling is fixed for
-  // `kPointInTimeRecovery`.
-
+TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
   // Ensures full purge cannot delete a WAL while it's in the process of being
   // recycled. In particular, we force the full purge after a file has been
   // chosen for reuse, but before it has been renamed.
   for (int i = 0; i < 2; ++i) {
     Options options = CurrentOptions();
     options.recycle_log_file_num = 1;
+    options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
     if (i != 0) {
       options.wal_dir = alternative_wal_dir_;
     }
index 792ec305a658f3de1213a9cb1ce65d07e70295d2..f464a3036b8deb6cff5180d83aaf07fd2d1d27b2 100644 (file)
@@ -821,6 +821,95 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
   ASSERT_LE(bytes_num, 1024 * 100);
 }
 
+void CorruptLogFile(Env* env, Options& options, std::string log_path,
+                    uint64_t log_num, int record_num) {
+  std::shared_ptr<FileSystem> fs = env->GetFileSystem();
+  std::unique_ptr<SequentialFileReader> file_reader;
+  Status status;
+  {
+    std::unique_ptr<FSSequentialFile> file;
+    status = fs->NewSequentialFile(log_path, FileOptions(), &file, nullptr);
+    ASSERT_EQ(status, IOStatus::OK());
+    file_reader.reset(new SequentialFileReader(std::move(file), log_path));
+  }
+  std::unique_ptr<log::Reader> reader(new log::Reader(
+      nullptr, std::move(file_reader), nullptr, false, log_num));
+  std::string scratch;
+  Slice record;
+  uint64_t record_checksum;
+  for (int i = 0; i < record_num; ++i) {
+    ASSERT_TRUE(reader->ReadRecord(&record, &scratch, options.wal_recovery_mode,
+                                   &record_checksum));
+  }
+  uint64_t rec_start = reader->LastRecordOffset();
+  reader.reset();
+  {
+    std::unique_ptr<FSRandomRWFile> file;
+    status = fs->NewRandomRWFile(log_path, FileOptions(), &file, nullptr);
+    ASSERT_EQ(status, IOStatus::OK());
+    uint32_t bad_lognum = 0xff;
+    ASSERT_EQ(file->Write(
+                  rec_start + 7,
+                  Slice(reinterpret_cast<char*>(&bad_lognum), sizeof(uint32_t)),
+                  IOOptions(), nullptr),
+              IOStatus::OK());
+    ASSERT_OK(file->Close(IOOptions(), nullptr));
+    file.reset();
+  }
+}
+
+TEST_P(DBWriteTest, RecycleLogTest) {
+  Options options = GetOptions();
+  options.recycle_log_file_num = 0;
+  options.avoid_flush_during_recovery = true;
+  options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+
+  Reopen(options);
+  ASSERT_OK(Put(Key(1), "val1"));
+  ASSERT_OK(Put(Key(2), "val1"));
+
+  uint64_t latest_log_num = 0;
+  std::unique_ptr<LogFile> log_file;
+  ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
+  latest_log_num = log_file->LogNumber();
+  Reopen(options);
+  ASSERT_OK(Put(Key(3), "val3"));
+
+  // Corrupt second entry of first log
+  std::string log_path = LogFileName(dbname_, latest_log_num);
+  CorruptLogFile(env_, options, log_path, latest_log_num, 2);
+
+  Reopen(options);
+  ASSERT_EQ(Get(Key(1)), "val1");
+  ASSERT_EQ(Get(Key(2)), "NOT_FOUND");
+  ASSERT_EQ(Get(Key(3)), "NOT_FOUND");
+}
+
+TEST_P(DBWriteTest, RecycleLogTestCFAheadOfWAL) {
+  Options options = GetOptions();
+  options.recycle_log_file_num = 0;
+  options.avoid_flush_during_recovery = true;
+  options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+
+  CreateAndReopenWithCF({"pikachu"}, options);
+  ASSERT_OK(Put(1, Key(1), "val1"));
+  ASSERT_OK(Put(0, Key(2), "val2"));
+
+  uint64_t latest_log_num = 0;
+  std::unique_ptr<LogFile> log_file;
+  ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
+  latest_log_num = log_file->LogNumber();
+  ASSERT_OK(Flush(1));
+  ASSERT_OK(Put(1, Key(3), "val3"));
+
+  // Corrupt second entry of first log
+  std::string log_path = LogFileName(dbname_, latest_log_num);
+  CorruptLogFile(env_, options, log_path, latest_log_num, 2);
+
+  ASSERT_EQ(TryReopenWithColumnFamilies({"default", "pikachu"}, options),
+            Status::Corruption());
+}
+
 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
                         testing::Values(DBTestBase::kDefault,
                                         DBTestBase::kConcurrentWALWrites,
index 48380a735c57a93411ad0ed84a8ce23acac7f31e..da979a1ee1eec6710aaf3c786c458252bd3a9375 100644 (file)
@@ -258,6 +258,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
             //  writing a physical record but before completing the next; don't
             //  treat it as a corruption, just ignore the entire logical record.
             scratch->clear();
+          } else {
+            if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
+              ReportOldLogRecord(scratch->size());
+            }
           }
           return false;
         }
@@ -405,6 +409,12 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
   }
 }
 
+void Reader::ReportOldLogRecord(size_t bytes) {
+  if (reporter_ != nullptr) {
+    reporter_->OldLogRecord(bytes);
+  }
+}
+
 bool Reader::ReadMore(size_t* drop_size, int* error) {
   if (!eof_ && !read_error_) {
     // Last read was a full read, so this is a trailer to skip
index 697d1b5d58c03f4eaee3118d35577e4a67aa63bd..6e4eded09163bab7966c4f7e7ebd4dc3d3c7a1e0 100644 (file)
@@ -45,6 +45,8 @@ class Reader {
     // Some corruption was detected.  "size" is the approximate number
     // of bytes dropped due to the corruption.
     virtual void Corruption(size_t bytes, const Status& status) = 0;
+
+    virtual void OldLogRecord(size_t /*bytes*/) {}
   };
 
   // Create a reader that will return log records from "*file".
@@ -202,6 +204,7 @@ class Reader {
   // buffer_ must be updated to remove the dropped bytes prior to invocation.
   void ReportCorruption(size_t bytes, const char* reason);
   void ReportDrop(size_t bytes, const Status& reason);
+  void ReportOldLogRecord(size_t bytes);
 
   void InitCompression(const CompressionTypeRecord& compression_record);
 
index d4204d182acb76320c73798f37b001fe8d1e3b3e..3fbd5729c2c95f527d0c5e866e2e231ff439887d 100644 (file)
@@ -771,6 +771,10 @@ def finalize_and_sanitize(src_params):
         # disable atomic flush.
         if dest_params["test_best_efforts_recovery"] == 0:
           dest_params["disable_wal"] = 0
+    if dest_params.get("disable_wal") == 1:
+        # disableWAL and recycle_log_file_num options are not mutually
+        # compatible at the moment
+        dest_params["recycle_log_file_num"] = 0
     return dest_params
 
 
diff --git a/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md b/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md
new file mode 100644 (file)
index 0000000..d11ba21
--- /dev/null
@@ -0,0 +1 @@
+Re-enable the recycle_log_file_num option in DBOptions for kPointInTimeRecovery WAL recovery mode, which was previously disabled due to a bug in the recovery logic. This option is incompatible with WriteOptions::disableWAL. A Status::InvalidArgument() will be returned if disableWAL is specified.