]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Get current LogFileNumberSize the same as log_writer (#10086)
authorJay Zhuang <zjay@fb.com>
Wed, 1 Jun 2022 22:33:22 +0000 (15:33 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 1 Jun 2022 22:33:22 +0000 (15:33 -0700)
Summary:
`db_impl.alive_log_files_` is used to track the WAL size in `db_impl.logs_`.
Get the `LogFileNumberSize` obj in `alive_log_files_` the same time as `log_writer` to keep them consistent.
For this issue, it's not safe to do `deque::reverse_iterator::operator*` and `deque::pop_front()` concurrently,
so remove the tail cache.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10086

Test Plan:
```
# on Windows
gtest-parallel ./db_test --gtest_filter=DBTest.FileCreationRandomFailure -r 1000 -w 100
```

Reviewed By: riversand963

Differential Revision: D36822373

Pulled By: jay-zhuang

fbshipit-source-id: 5e738051dfc7bcf6a15d85ba25e6365df6b6a6af

HISTORY.md
db/db_impl/db_impl.h
db/db_impl/db_impl_open.cc
db/db_impl/db_impl_write.cc

index bb18de05e008b09691413485e8b38e83b9082a47..79a18591dfd77ee377fb50137b12f071268e440a 100644 (file)
@@ -20,6 +20,9 @@
 ### Behavior changes
 * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
 
+### Bug Fixes
+* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed.
+
 ## 7.3.0 (05/20/2022)
 ### Bug Fixes
 * Fixed a bug where manual flush would block forever even though flush options had wait=false.
index 0891d001b8359f2c934243716edef2861bbabe7a..c3a4b814c3b475b7e8e88e72d3cc011593f6a07f 100644 (file)
@@ -1834,12 +1834,13 @@ class DBImpl : public DB {
   IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
                       uint64_t* log_used, uint64_t* log_size,
                       Env::IOPriority rate_limiter_priority,
-                      bool with_db_mutex = false, bool with_log_mutex = false);
+                      LogFileNumberSize& log_file_number_size);
 
   IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
                       log::Writer* log_writer, uint64_t* log_used,
                       bool need_log_sync, bool need_log_dir_sync,
-                      SequenceNumber sequence);
+                      SequenceNumber sequence,
+                      LogFileNumberSize& log_file_number_size);
 
   IOStatus ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
                                 uint64_t* log_used,
@@ -2169,11 +2170,7 @@ class DBImpl : public DB {
   // are protected by locking both mutex_ and log_write_mutex_, and reads must
   // be under either mutex_ or log_write_mutex_.
   std::deque<LogFileNumberSize> alive_log_files_;
-  // Caching the result of `alive_log_files_.back()` so that we do not have to
-  // call `alive_log_files_.back()` in the write thread (WriteToWAL()) which
-  // requires locking db mutex if log_mutex_ is not already held in
-  // two-write-queues mode.
-  std::deque<LogFileNumberSize>::reverse_iterator alive_log_files_tail_;
+
   // Log files that aren't fully synced, and the current log file.
   // Synchronization:
   //  - push_back() is done from write_thread_ with locked mutex_ and
index 9a2e912037f6ec3d1034316ac7b1fd130825b1bd..c87ac00ec5c4729b5eae4a6de4ed79fe054d4d5f 100644 (file)
@@ -1420,7 +1420,6 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
     total_log_size_ += log.size;
     alive_log_files_.push_back(log);
   }
-  alive_log_files_tail_ = alive_log_files_.rbegin();
   if (two_write_queues_) {
     log_write_mutex_.Unlock();
   }
@@ -1807,7 +1806,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
       }
       impl->alive_log_files_.push_back(
           DBImpl::LogFileNumberSize(impl->logfile_number_));
-      impl->alive_log_files_tail_ = impl->alive_log_files_.rbegin();
       if (impl->two_write_queues_) {
         impl->log_write_mutex_.Unlock();
       }
@@ -1828,8 +1826,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
         WriteOptions write_options;
         uint64_t log_used, log_size;
         log::Writer* log_writer = impl->logs_.back().writer;
+        LogFileNumberSize& log_file_number_size = impl->alive_log_files_.back();
+
+        assert(log_writer->get_log_number() == log_file_number_size.number);
+        impl->mutex_.AssertHeld();
         s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
-                             Env::IO_TOTAL, /*with_db_mutex==*/true);
+                             Env::IO_TOTAL, log_file_number_size);
         if (s.ok()) {
           // Need to fsync, otherwise it might get lost after a power reset.
           s = impl->FlushWAL(false);
index c980b2dc6e1d313367de0f47d126c6a446f94a0c..62d68bc088bdc7b95b1845b0d44a4cfac8153023 100644 (file)
@@ -319,7 +319,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
 
     PERF_TIMER_START(write_pre_and_post_process_time);
   }
+
   log::Writer* log_writer = logs_.back().writer;
+  LogFileNumberSize& log_file_number_size = alive_log_files_.back();
+
+  assert(log_writer->get_log_number() == log_file_number_size.number);
 
   mutex_.Unlock();
 
@@ -419,7 +423,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
       if (status.ok() && !write_options.disableWAL) {
         PERF_TIMER_GUARD(write_wal_time);
         io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
-                          need_log_dir_sync, last_sequence + 1);
+                          need_log_dir_sync, last_sequence + 1,
+                          log_file_number_size);
       }
     } else {
       if (status.ok() && !write_options.disableWAL) {
@@ -586,6 +591,10 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
     w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
     PERF_TIMER_START(write_pre_and_post_process_time);
     log::Writer* log_writer = logs_.back().writer;
+    LogFileNumberSize& log_file_number_size = alive_log_files_.back();
+
+    assert(log_writer->get_log_number() == log_file_number_size.number);
+
     mutex_.Unlock();
 
     // This can set non-OK status if callback fail.
@@ -649,8 +658,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
                           wal_write_group.size - 1);
         RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
       }
-      io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
-                        need_log_dir_sync, current_sequence);
+      io_s =
+          WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
+                     need_log_dir_sync, current_sequence, log_file_number_size);
       w.status = io_s;
     }
 
@@ -1178,17 +1188,9 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
                             log::Writer* log_writer, uint64_t* log_used,
                             uint64_t* log_size,
                             Env::IOPriority rate_limiter_priority,
-                            bool with_db_mutex, bool with_log_mutex) {
+                            LogFileNumberSize& log_file_number_size) {
   assert(log_size != nullptr);
 
-  // Assert mutex explicitly.
-  if (with_db_mutex) {
-    mutex_.AssertHeld();
-  } else if (two_write_queues_) {
-    log_write_mutex_.AssertHeld();
-    assert(with_log_mutex);
-  }
-
   Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
   *log_size = log_entry.size();
   // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
@@ -1211,12 +1213,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
     *log_used = logfile_number_;
   }
   total_log_size_ += log_entry.size();
-  if (with_db_mutex || with_log_mutex) {
-    assert(alive_log_files_tail_ == alive_log_files_.rbegin());
-    assert(alive_log_files_tail_ != alive_log_files_.rend());
-  }
-  LogFileNumberSize& last_alive_log = *alive_log_files_tail_;
-  last_alive_log.AddSize(*log_size);
+  log_file_number_size.AddSize(*log_size);
   log_empty_ = false;
   return io_s;
 }
@@ -1224,7 +1221,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
 IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
                             log::Writer* log_writer, uint64_t* log_used,
                             bool need_log_sync, bool need_log_dir_sync,
-                            SequenceNumber sequence) {
+                            SequenceNumber sequence,
+                            LogFileNumberSize& log_file_number_size) {
   IOStatus io_s;
   assert(!two_write_queues_);
   assert(!write_group.leader->disable_wal);
@@ -1245,7 +1243,8 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
 
   uint64_t log_size;
   io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
-                    write_group.leader->rate_limiter_priority);
+                    write_group.leader->rate_limiter_priority,
+                    log_file_number_size);
   if (to_be_cached_state) {
     cached_recoverable_state_ = *to_be_cached_state;
     cached_recoverable_state_empty_ = false;
@@ -1339,10 +1338,14 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
   WriteBatchInternal::SetSequence(merged_batch, sequence);
 
   log::Writer* log_writer = logs_.back().writer;
+  LogFileNumberSize& log_file_number_size = alive_log_files_.back();
+
+  assert(log_writer->get_log_number() == log_file_number_size.number);
+
   uint64_t log_size;
   io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
                     write_group.leader->rate_limiter_priority,
-                    /*with_db_mutex=*/false, /*with_log_mutex=*/true);
+                    log_file_number_size);
   if (to_be_cached_state) {
     cached_recoverable_state_ = *to_be_cached_state;
     cached_recoverable_state_empty_ = false;
@@ -1998,7 +2001,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
       log_dir_synced_ = false;
       logs_.emplace_back(logfile_number_, new_log);
       alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
-      alive_log_files_tail_ = alive_log_files_.rbegin();
     }
     log_write_mutex_.Unlock();
   }