]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix race condition with WAL tracking and `FlushWAL(true /* sync */)` (#10185)
authorAndrew Kryczka <andrewkr@fb.com>
Fri, 17 Jun 2022 23:45:28 +0000 (16:45 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 17 Jun 2022 23:45:28 +0000 (16:45 -0700)
Summary:
`FlushWAL(true /* sync */)` is used internally and for manual WAL sync. It had a bug when used together with `track_and_verify_wals_in_manifest` where the synced size tracked in MANIFEST was larger than the number of bytes actually synced.

The bug could be repro'd almost immediately with the following crash test command: `python3 tools/db_crashtest.py blackbox --simple --write_buffer_size=524288 --max_bytes_for_level_base=2097152 --target_file_size_base=524288 --duration=3600 --interval=10 --sync_fault_injection=1 --disable_wal=0 --checkpoint_one_in=1000 --max_key=10000 --value_size_mult=33`.

An example error message produced by the above command is shown below. The error sometimes arose from the checkpoint and other times arose from the main stress test DB.

```
Corruption: Size mismatch: WAL (log number: 119) in MANIFEST is 27938 bytes , but actually is 27859 bytes on disk.
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10185

Test Plan:
- repro unit test
- the above crash test command no longer finds the error. It does find a different error after a while longer such as "Corruption: WAL file 481 required by manifest but not in directory list"

Reviewed By: riversand963

Differential Revision: D37200993

Pulled By: ajkr

fbshipit-source-id: 98e0071c1a89f4d009888512ed89f9219779ae5f

HISTORY.md
db/db_impl/db_impl.cc
db/db_impl/db_impl.h
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_files.cc
db/db_impl/db_impl_write.cc
db/db_write_test.cc
file/writable_file_writer.cc
file/writable_file_writer.h
utilities/checkpoint/checkpoint_test.cc

index a115c051a2ca4a4a1770d387ef04f1e40f185b38..45e52b4cb3f3e40f796d9cbf61c379488f565fd4 100644 (file)
@@ -11,6 +11,7 @@
 * Fix a bug that could return wrong results with `index_type=kHashSearch` and using `SetOptions` to change the `prefix_extractor`.
 * Fixed a bug in WAL tracking with wal_compression. WAL compression writes a kSetCompressionType record which is not associated with any sequence number. As result, WalManager::GetSortedWalsOfType() will skip these WALs and not return them to caller, e.g. Checkpoint, Backup, causing the operations to fail.
 * Avoid a crash if the IDENTITY file is accidentally truncated to empty. A new DB ID will be written and generated on Open.
+* Fixed a possible corruption for users of `manual_wal_flush` and/or `FlushWAL(true /* sync */)`, together with `track_and_verify_wals_in_manifest == true`. For those users, losing unsynced data (e.g., due to power loss) could make future DB opens fail with a `Status::Corruption` complaining about missing WAL data.
 
 ### Public API changes
 * Add new API GetUnixTime in Snapshot class which returns the unix time at which Snapshot is taken.
index d3e0ff0c2937b054c4720c1d98f08686445ca190..58c3035c9c4cd71aa4d2c96b67eb873720ad7268 100644 (file)
@@ -1367,6 +1367,7 @@ Status DBImpl::FlushWAL(bool sync) {
 }
 
 Status DBImpl::SyncWAL() {
+  TEST_SYNC_POINT("DBImpl::SyncWAL:Begin");
   autovector<log::Writer*, 1> logs_to_sync;
   bool need_log_dir_sync;
   uint64_t current_log_number;
@@ -1379,7 +1380,7 @@ Status DBImpl::SyncWAL() {
     current_log_number = logfile_number_;
 
     while (logs_.front().number <= current_log_number &&
-           logs_.front().getting_synced) {
+           logs_.front().IsSyncing()) {
       log_sync_cv_.Wait();
     }
     // First check that logs are safe to sync in background.
@@ -1396,8 +1397,7 @@ Status DBImpl::SyncWAL() {
     for (auto it = logs_.begin();
          it != logs_.end() && it->number <= current_log_number; ++it) {
       auto& log = *it;
-      assert(!log.getting_synced);
-      log.getting_synced = true;
+      log.PrepareForSync();
       logs_to_sync.push_back(log.writer);
     }
 
@@ -1470,11 +1470,10 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
   VersionEdit synced_wals;
   for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
     auto& wal = *it;
-    assert(wal.getting_synced);
+    assert(wal.IsSyncing());
     if (immutable_db_options_.track_and_verify_wals_in_manifest &&
-        wal.writer->file()->GetFileSize() > 0) {
-      synced_wals.AddWal(wal.number,
-                         WalMetadata(wal.writer->file()->GetFileSize()));
+        wal.GetPreSyncSize() > 0) {
+      synced_wals.AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
     }
 
     if (logs_.size() > 1) {
@@ -1483,12 +1482,12 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
       InstrumentedMutexLock l(&log_write_mutex_);
       it = logs_.erase(it);
     } else {
-      wal.getting_synced = false;
+      wal.FinishSync();
       ++it;
     }
   }
   assert(logs_.empty() || logs_[0].number > up_to ||
-         (logs_.size() == 1 && !logs_[0].getting_synced));
+         (logs_.size() == 1 && !logs_[0].IsSyncing()));
 
   Status s;
   if (synced_wals.IsWalAddition()) {
@@ -1508,8 +1507,7 @@ void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
   for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
        ++it) {
     auto& wal = *it;
-    assert(wal.getting_synced);
-    wal.getting_synced = false;
+    wal.FinishSync();
   }
   log_sync_cv_.SignalAll();
 }
index 27a3384ec8716d01240c282348fe7281839213e5..fa570a368e938c142f1b000cadc6f19b1105b201 100644 (file)
@@ -1593,12 +1593,38 @@ class DBImpl : public DB {
       return s;
     }
 
+    bool IsSyncing() { return getting_synced; }
+
+    uint64_t GetPreSyncSize() {
+      assert(getting_synced);
+      return pre_sync_size;
+    }
+
+    void PrepareForSync() {
+      assert(!getting_synced);
+      // Size is expected to be monotonically increasing.
+      assert(writer->file()->GetFlushedSize() >= pre_sync_size);
+      getting_synced = true;
+      pre_sync_size = writer->file()->GetFlushedSize();
+    }
+
+    void FinishSync() {
+      assert(getting_synced);
+      getting_synced = false;
+    }
+
     uint64_t number;
     // Visual Studio doesn't support deque's member to be noncopyable because
     // of a std::unique_ptr as a member.
     log::Writer* writer;  // own
+
+   private:
     // true for some prefix of logs_
     bool getting_synced = false;
+    // The size of the file before the sync happens. This amount is guaranteed
+    // to be persisted even if appends happen during sync so it can be used for
+    // tracking the synced size in MANIFEST.
+    uint64_t pre_sync_size = 0;
   };
 
   // PurgeFileInfo is a structure to hold information of files to be deleted in
index 16d8c8f355cad4ba0e6862ce01c4de88f09714f5..f79b7bee870bb0222ba36fb83c87a23767f2e38d 100644 (file)
@@ -88,14 +88,13 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
   autovector<log::Writer*, 1> logs_to_sync;
   uint64_t current_log_number = logfile_number_;
   while (logs_.front().number < current_log_number &&
-         logs_.front().getting_synced) {
+         logs_.front().IsSyncing()) {
     log_sync_cv_.Wait();
   }
   for (auto it = logs_.begin();
        it != logs_.end() && it->number < current_log_number; ++it) {
     auto& log = *it;
-    assert(!log.getting_synced);
-    log.getting_synced = true;
+    log.PrepareForSync();
     logs_to_sync.push_back(log.writer);
   }
 
index 596d3aa7354cef999450f846015cd4b5791e13da..f5ca248be6ad081a21906f52135c32063f8e6236 100644 (file)
@@ -288,7 +288,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
     }
     while (!logs_.empty() && logs_.front().number < min_log_number) {
       auto& log = logs_.front();
-      if (log.getting_synced) {
+      if (log.IsSyncing()) {
         log_sync_cv_.Wait();
         // logs_ could have changed while we were waiting.
         continue;
index 09287a4ef8cbc0dae403db88584e3bbc344a1039..e65ecf75a72caeeaa12885fdecc632175bb2a998 100644 (file)
@@ -1181,17 +1181,16 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
     // Note: there does not seem to be a reason to wait for parallel sync at
     // this early step but it is not important since parallel sync (SyncWAL) and
     // need_log_sync are usually not used together.
-    while (logs_.front().getting_synced) {
+    while (logs_.front().IsSyncing()) {
       log_sync_cv_.Wait();
     }
     for (auto& log : logs_) {
-      assert(!log.getting_synced);
       // This is just to prevent the logs to be synced by a parallel SyncWAL
       // call. We will do the actual syncing later after we will write to the
       // WAL.
       // Note: there does not seem to be a reason to set this early before we
       // actually write to the WAL
-      log.getting_synced = true;
+      log.PrepareForSync();
     }
   } else {
     *need_log_sync = false;
index aae97ef0c10bfaafd0cb42f4ad337580e90ab1fb..7107d2bb2d345a7f7fefdd803ec1df90cede64bc 100644 (file)
@@ -334,6 +334,41 @@ TEST_P(DBWriteTest, ManualWalFlushInEffect) {
   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
 }
 
+TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
+  // Repro race condition bug where unflushed WAL data extended the synced size
+  // recorded to MANIFEST despite being unrecoverable.
+  Options options = GetOptions();
+  std::unique_ptr<FaultInjectionTestEnv> fault_env(
+      new FaultInjectionTestEnv(env_));
+  options.env = fault_env.get();
+  options.manual_wal_flush = true;
+  options.track_and_verify_wals_in_manifest = true;
+  Reopen(options);
+
+  ASSERT_OK(Put("key1", "val1"));
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::SyncWAL:Begin",
+      [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db_->FlushWAL(true /* sync */));
+
+  // Ensure callback ran.
+  ASSERT_EQ("val2", Get("key2"));
+
+  Close();
+
+  // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
+  // DB WAL.
+  fault_env->DropUnsyncedFileData();
+
+  Reopen(options);
+
+  // Need to close before `fault_env` goes out of scope.
+  Close();
+}
+
 TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
   std::unique_ptr<FaultInjectionTestEnv> mock_env(
       new FaultInjectionTestEnv(env_));
index ce1b7bc3de5ae37c3bb1185cb4f998d3e5804bfb..3b98ea6589d7906a5b548348682e258e05da0efa 100644 (file)
@@ -585,6 +585,8 @@ IOStatus WritableFileWriter::WriteBuffered(
 
     left -= allowed;
     src += allowed;
+    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+    flushed_size_.store(cur_size + allowed, std::memory_order_release);
   }
   buf_.Size(0);
   buffered_data_crc32c_checksum_ = 0;
@@ -675,6 +677,8 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
   // the corresponding checksum value
   buf_.Size(0);
   buffered_data_crc32c_checksum_ = 0;
+  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+  flushed_size_.store(cur_size + left, std::memory_order_release);
   return s;
 }
 
@@ -782,6 +786,8 @@ IOStatus WritableFileWriter::WriteDirect(
     left -= size;
     src += size;
     write_offset += size;
+    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+    flushed_size_.store(cur_size + size, std::memory_order_release);
     assert((next_write_offset_ % alignment) == 0);
   }
 
@@ -884,6 +890,8 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
 
   IOSTATS_ADD(bytes_written, left);
   assert((next_write_offset_ % alignment) == 0);
+  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
+  flushed_size_.store(cur_size + left, std::memory_order_release);
 
   if (s.ok()) {
     // Move the tail to the beginning of the buffer
index 2cbc715b3a78c84eb3fb4ca4ae63d0730754480f..bb9e5a6a1e297cbbb4d9a5ad3d504605a2279ac8 100644 (file)
@@ -143,6 +143,7 @@ class WritableFileWriter {
   // Actually written data size can be used for truncate
   // not counting padding data
   std::atomic<uint64_t> filesize_;
+  std::atomic<uint64_t> flushed_size_;
 #ifndef ROCKSDB_LITE
   // This is necessary when we use unbuffered access
   // and writes must happen on aligned offsets
@@ -180,6 +181,7 @@ class WritableFileWriter {
         buf_(),
         max_buffer_size_(options.writable_file_max_buffer_size),
         filesize_(0),
+        flushed_size_(0),
 #ifndef ROCKSDB_LITE
         next_write_offset_(0),
 #endif  // ROCKSDB_LITE
@@ -259,6 +261,14 @@ class WritableFileWriter {
     return filesize_.load(std::memory_order_acquire);
   }
 
+  // Returns the size of data flushed to the underlying `FSWritableFile`.
+  // Expected to match `writable_file()->GetFileSize()`.
+  // The return value can serve as a lower-bound for the amount of data synced
+  // by a future call to `SyncWithoutFlush()`.
+  uint64_t GetFlushedSize() const {
+    return flushed_size_.load(std::memory_order_acquire);
+  }
+
   IOStatus InvalidateCache(size_t offset, size_t length) {
     return writable_file_->InvalidateCache(offset, length);
   }
index 18c437dbbc8e2a439098f330c0e0e874e94ec467..10aaf16ba5c3305d62cb2b9d6dd7a01f21799091 100644 (file)
@@ -915,6 +915,51 @@ TEST_F(CheckpointTest, CheckpointWithDbPath) {
   delete checkpoint;
 }
 
+TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
+  // Repro for a race condition where a user write comes in after the checkpoint
+  // syncs WAL for `track_and_verify_wals_in_manifest` but before the
+  // corresponding MANIFEST update. With the bug, that scenario resulted in an
+  // unopenable DB with error "Corruption: Size mismatch: WAL ...".
+  Options options = CurrentOptions();
+  std::unique_ptr<FaultInjectionTestEnv> fault_env(
+      new FaultInjectionTestEnv(env_));
+  options.env = fault_env.get();
+  options.track_and_verify_wals_in_manifest = true;
+  Reopen(options);
+
+  ASSERT_OK(Put("key1", "val1"));
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
+      [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  std::unique_ptr<Checkpoint> checkpoint;
+  {
+    Checkpoint* checkpoint_ptr;
+    ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
+    checkpoint.reset(checkpoint_ptr);
+  }
+
+  ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+
+  // Ensure callback ran.
+  ASSERT_EQ("val2", Get("key2"));
+
+  Close();
+
+  // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
+  // DB WAL.
+  fault_env->DropUnsyncedFileData();
+
+  // Before the bug fix, reopening the DB would fail because the MANIFEST's
+  // AddWal entry indicated the WAL should be synced through "key2" -> "val2".
+  Reopen(options);
+
+  // Need to close before `fault_env` goes out of scope.
+  Close();
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {