]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix data race on `ColumnFamilyData::flush_reason` by letting FlushRequest/Job owns...
authorHui Xiao <huixiao@fb.com>
Tue, 24 Jan 2023 17:54:04 +0000 (09:54 -0800)
committerAndrew Kryczka <andrewkr@fb.com>
Thu, 2 Feb 2023 00:59:41 +0000 (16:59 -0800)
Summary:
**Context:**
Concurrent flushes on the same CF can set on `ColumnFamilyData::flush_reason` before each other flush finishes. An symptom is one CF has different flush_reason with others though all of them are in an atomic flush  `db_stress: db/db_impl/db_impl_compaction_flush.cc:423: rocksdb::Status rocksdb::DBImpl::AtomicFlushMemTablesToOutputFiles(const rocksdb::autovector<rocksdb::DBImpl::BGFlushArg>&, bool*, rocksdb::JobContext*, rocksdb::LogBuffer*, rocksdb::Env::Priority): Assertion cfd->GetFlushReason() == cfds[0]->GetFlushReason() failed. `

**Summary:**
Suggested by ltamasi, we now refactor and let FlushRequest/Job to own flush_reason as there is no good way to define `ColumnFamilyData::flush_reason` in face of concurrent flushes on the same CF (which wasn't the case a long time ago when `ColumnFamilyData::flush_reason ` first introduced`)

**Tets:**
- new unit test
- make check
- aggressive crash test rehearsal

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11111

Reviewed By: ajkr

Differential Revision: D42644600

Pulled By: hx235

fbshipit-source-id: 8589c8184869d3415e5b780c887f877818a5ebaf

HISTORY.md
db/column_family.cc
db/column_family.h
db/db_flush_test.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl.h
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_write.cc
db/flush_job.cc
db/flush_job.h
db/flush_job_test.cc

index b0364807a879d89f481838e4cc62f5cec73aab6e..28ae4eac9bdde1f836ff6b33cf17c5030f24ebe1 100644 (file)
@@ -1,4 +1,8 @@
 # Rocksdb Change Log
+## Unreleased
+### Bug Fixes
+* Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes.
+
 ## 7.10.0 (01/23/2023)
 ### Behavior changes
 * Make best-efforts recovery verify SST unique ID before Version construction (#10962)
index 8124b23cd7eda57caa9a27b8e7356e145f8b0e03..3a1d22c3df048e24c9fa83adcc64aaa545a8eeef 100644 (file)
@@ -557,7 +557,6 @@ ColumnFamilyData::ColumnFamilyData(
       next_(nullptr),
       prev_(nullptr),
       log_number_(0),
-      flush_reason_(FlushReason::kOthers),
       column_family_set_(column_family_set),
       queued_for_flush_(false),
       queued_for_compaction_(false),
index ff4eca514bb86e492b8f54b474ea4aaed2cc4c15..0c696ed4e2b12e458e19e8c592e0b9a5df14c971 100644 (file)
@@ -310,10 +310,6 @@ class ColumnFamilyData {
   void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
   uint64_t GetLogNumber() const { return log_number_; }
 
-  void SetFlushReason(FlushReason flush_reason) {
-    flush_reason_ = flush_reason;
-  }
-  FlushReason GetFlushReason() const { return flush_reason_; }
   // thread-safe
   const FileOptions* soptions() const;
   const ImmutableOptions* ioptions() const { return &ioptions_; }
@@ -616,8 +612,6 @@ class ColumnFamilyData {
   // recovered from
   uint64_t log_number_;
 
-  std::atomic<FlushReason> flush_reason_;
-
   // An object that keeps all the compaction stats
   // and picks the next compaction
   std::unique_ptr<CompactionPicker> compaction_picker_;
index 3b3f7e1836c3e6a3c1df9898d7a5d1ee43b1c38a..0587afb3a5b4bc59cb2da55475728e979d2c4050 100644 (file)
@@ -746,6 +746,64 @@ class TestFlushListener : public EventListener {
 };
 #endif  // !ROCKSDB_LITE
 
+// RocksDB lite does not support GetLiveFiles()
+#ifndef ROCKSDB_LITE
+TEST_F(DBFlushTest, FixFlushReasonRaceFromConcurrentFlushes) {
+  Options options = CurrentOptions();
+  options.atomic_flush = true;
+  options.disable_auto_compactions = true;
+  CreateAndReopenWithCF({"cf1"}, options);
+
+  for (int idx = 0; idx < 1; ++idx) {
+    ASSERT_OK(Put(0, Key(idx), std::string(1, 'v')));
+    ASSERT_OK(Put(1, Key(idx), std::string(1, 'v')));
+  }
+
+  // To coerce a manual flush happenning in the middle of GetLiveFiles's flush,
+  // we need to pause background flush thread and enable it later.
+  std::shared_ptr<test::SleepingBackgroundTask> sleeping_task =
+      std::make_shared<test::SleepingBackgroundTask>();
+  env_->SetBackgroundThreads(1, Env::HIGH);
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+                 sleeping_task.get(), Env::Priority::HIGH);
+  sleeping_task->WaitUntilSleeping();
+
+  // Coerce a manual flush happenning in the middle of GetLiveFiles's flush
+  bool get_live_files_paused_at_sync_point = false;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::AtomicFlushMemTables:AfterScheduleFlush", [&](void* /* arg */) {
+        if (get_live_files_paused_at_sync_point) {
+          // To prevent non-GetLiveFiles() flush from pausing at this sync point
+          return;
+        }
+        get_live_files_paused_at_sync_point = true;
+
+        FlushOptions fo;
+        fo.wait = false;
+        fo.allow_write_stall = true;
+        ASSERT_OK(dbfull()->Flush(fo));
+
+        // Resume background flush thread so GetLiveFiles() can finish
+        sleeping_task->WakeUp();
+        sleeping_task->WaitUntilDone();
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  std::vector<std::string> files;
+  uint64_t manifest_file_size;
+  // Before the fix, a race condition on default cf's flush reason due to
+  // concurrent GetLiveFiles's flush and manual flush will fail
+  // an internal assertion.
+  // After the fix, such race condition is fixed and there is no assertion
+  // failure.
+  ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true));
+  ASSERT_TRUE(get_live_files_paused_at_sync_point);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+#endif  // !ROCKSDB_LITE
+
 TEST_F(DBFlushTest, MemPurgeBasic) {
   Options options = CurrentOptions();
 
@@ -2440,7 +2498,9 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
   options.atomic_flush = GetParam();
   // 64MB so that memtable flush won't be trigger by the small writes.
   options.write_buffer_size = (static_cast<size_t>(64) << 20);
-
+  auto flush_listener = std::make_shared<FlushCounterListener>();
+  flush_listener->expected_flush_reason = FlushReason::kManualFlush;
+  options.listeners.push_back(flush_listener);
   // Destroy the DB to recreate as a TransactionDB.
   Close();
   Destroy(options, true);
@@ -2507,7 +2567,6 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
     auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
     ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
     ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
-    ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
   }
 
   // The recovered min log number with prepared data should be non-zero.
@@ -2520,13 +2579,15 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
   ASSERT_TRUE(db_impl->allow_2pc());
   ASSERT_NE(db_impl->MinLogNumberToKeep(), 0);
 }
-#endif  // ROCKSDB_LITE
 
 TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
   Options options = CurrentOptions();
   options.create_if_missing = true;
   options.atomic_flush = GetParam();
   options.write_buffer_size = (static_cast<size_t>(64) << 20);
+  auto flush_listener = std::make_shared<FlushCounterListener>();
+  flush_listener->expected_flush_reason = FlushReason::kManualFlush;
+  options.listeners.push_back(flush_listener);
 
   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
   size_t num_cfs = handles_.size();
@@ -2551,11 +2612,11 @@ TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
 
   for (size_t i = 0; i != num_cfs; ++i) {
     auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
-    ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
     ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
     ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
   }
 }
+#endif  // ROCKSDB_LITE
 
 TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
   Options options = CurrentOptions();
index f930878dee772c2151be5533806b1d7375086bbd..24fb5389060baed2565169771a50a9b9697ba81b 100644 (file)
@@ -604,7 +604,7 @@ Status DBImpl::CloseHelper() {
 
   while (!flush_queue_.empty()) {
     const FlushRequest& flush_req = PopFirstFromFlushQueue();
-    for (const auto& iter : flush_req) {
+    for (const auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
       iter.first->UnrefAndTryDelete();
     }
   }
index 29b703e248bcde5eaeb9901a348878ec2f351e4f..1bab7c7a9124ddfec2107fbc87b0a0dd45a85789 100644 (file)
@@ -16,6 +16,7 @@
 #include <map>
 #include <set>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -1383,7 +1384,7 @@ class DBImpl : public DB {
 
   void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
                           const MutableCFOptions& mutable_cf_options,
-                          int job_id);
+                          int job_id, FlushReason flush_reason);
 
   void NotifyOnFlushCompleted(
       ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
@@ -1675,12 +1676,17 @@ class DBImpl : public DB {
   // Argument required by background flush thread.
   struct BGFlushArg {
     BGFlushArg()
-        : cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {}
+        : cfd_(nullptr),
+          max_memtable_id_(0),
+          superversion_context_(nullptr),
+          flush_reason_(FlushReason::kOthers) {}
     BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id,
-               SuperVersionContext* superversion_context)
+               SuperVersionContext* superversion_context,
+               FlushReason flush_reason)
         : cfd_(cfd),
           max_memtable_id_(max_memtable_id),
-          superversion_context_(superversion_context) {}
+          superversion_context_(superversion_context),
+          flush_reason_(flush_reason) {}
 
     // Column family to flush.
     ColumnFamilyData* cfd_;
@@ -1691,6 +1697,7 @@ class DBImpl : public DB {
     // installs a new superversion for the column family. This operation
     // requires a SuperVersionContext object (currently embedded in JobContext).
     SuperVersionContext* superversion_context_;
+    FlushReason flush_reason_;
   };
 
   // Argument passed to flush thread.
@@ -1819,7 +1826,7 @@ class DBImpl : public DB {
   // installs a new super version for the column family.
   Status FlushMemTableToOutputFile(
       ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
-      bool* madeProgress, JobContext* job_context,
+      bool* madeProgress, JobContext* job_context, FlushReason flush_reason,
       SuperVersionContext* superversion_context,
       std::vector<SequenceNumber>& snapshot_seqs,
       SequenceNumber earliest_write_conflict_snapshot,
@@ -2030,18 +2037,22 @@ class DBImpl : public DB {
 
   void MaybeScheduleFlushOrCompaction();
 
-  // A flush request specifies the column families to flush as well as the
-  // largest memtable id to persist for each column family. Once all the
-  // memtables whose IDs are smaller than or equal to this per-column-family
-  // specified value, this flush request is considered to have completed its
-  // work of flushing this column family. After completing the work for all
-  // column families in this request, this flush is considered complete.
-  using FlushRequest = std::vector<std::pair<ColumnFamilyData*, uint64_t>>;
+  struct FlushRequest {
+    FlushReason flush_reason;
+    // A map from column family to flush to largest memtable id to persist for
+    // each column family. Once all the memtables whose IDs are smaller than or
+    // equal to this per-column-family specified value, this flush request is
+    // considered to have completed its work of flushing this column family.
+    // After completing the work for all column families in this request, this
+    // flush is considered complete.
+    std::unordered_map<ColumnFamilyData*, uint64_t>
+        cfd_to_max_mem_id_to_persist;
+  };
 
   void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
-                            FlushRequest* req);
+                            FlushReason flush_reason, FlushRequest* req);
 
-  void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
+  void SchedulePendingFlush(const FlushRequest& req);
 
   void SchedulePendingCompaction(ColumnFamilyData* cfd);
   void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
index d14b67876f64341ae6c6e0e4eb65b7bd76f69a71..b52ad5896520981d668b89b7c4e3854c52cf8317 100644 (file)
@@ -155,7 +155,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
 
 Status DBImpl::FlushMemTableToOutputFile(
     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
-    bool* made_progress, JobContext* job_context,
+    bool* made_progress, JobContext* job_context, FlushReason flush_reason,
     SuperVersionContext* superversion_context,
     std::vector<SequenceNumber>& snapshot_seqs,
     SequenceNumber earliest_write_conflict_snapshot,
@@ -215,7 +215,8 @@ Status DBImpl::FlushMemTableToOutputFile(
       dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,
       file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
       snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
-      job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
+      job_context, flush_reason, log_buffer, directories_.GetDbDir(),
+      GetDataDir(cfd, 0U),
       GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
       &event_logger_, mutable_cf_options.report_bg_io_stats,
       true /* sync_output_directory */, true /* write_manifest */, thread_pri,
@@ -260,7 +261,8 @@ Status DBImpl::FlushMemTableToOutputFile(
 
 #ifndef ROCKSDB_LITE
   // may temporarily unlock and lock the mutex.
-  NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
+  NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
+                     flush_reason);
 #endif  // ROCKSDB_LITE
 
   bool switched_to_mempurge = false;
@@ -390,8 +392,9 @@ Status DBImpl::FlushMemTablesToOutputFiles(
   MutableCFOptions mutable_cf_options_copy = *cfd->GetLatestMutableCFOptions();
   SuperVersionContext* superversion_context =
       bg_flush_arg.superversion_context_;
+  FlushReason flush_reason = bg_flush_arg.flush_reason_;
   Status s = FlushMemTableToOutputFile(
-      cfd, mutable_cf_options_copy, made_progress, job_context,
+      cfd, mutable_cf_options_copy, made_progress, job_context, flush_reason,
       superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
       snapshot_checker, log_buffer, thread_pri);
   return s;
@@ -420,7 +423,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
   for (const auto cfd : cfds) {
     assert(cfd->imm()->NumNotFlushed() != 0);
     assert(cfd->imm()->IsFlushPending());
-    assert(cfd->GetFlushReason() == cfds[0]->GetFlushReason());
+  }
+  for (const auto bg_flush_arg : bg_flush_args) {
+    assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_);
   }
 #endif /* !NDEBUG */
 
@@ -459,13 +464,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
     all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
     uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
+    FlushReason flush_reason = bg_flush_args[i].flush_reason_;
     jobs.emplace_back(new FlushJob(
         dbname_, cfd, immutable_db_options_, mutable_cf_options,
         max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
         &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
-        snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
-        data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
-        stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
+        snapshot_checker, job_context, flush_reason, log_buffer,
+        directories_.GetDbDir(), data_dir,
+        GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
+        &event_logger_, mutable_cf_options.report_bg_io_stats,
         false /* sync_output_directory */, false /* write_manifest */,
         thread_pri, io_tracer_, seqno_time_mapping_, db_id_, db_session_id_,
         cfd->GetFullHistoryTsLow(), &blob_callback_));
@@ -483,8 +490,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
   for (int i = 0; i != num_cfs; ++i) {
     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
     // may temporarily unlock and lock the mutex.
+    FlushReason flush_reason = bg_flush_args[i].flush_reason_;
     NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
-                       job_context->job_id);
+                       job_context->job_id, flush_reason);
   }
 #endif /* !ROCKSDB_LITE */
 
@@ -642,8 +650,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
 
     bool resuming_from_bg_err =
         error_handler_.IsDBStopped() ||
-        (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
-         cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
+        (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
+         bg_flush_args[0].flush_reason_ ==
+             FlushReason::kErrorRecoveryRetryFlush);
     while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) {
       std::pair<Status, bool> res = wait_to_install_func();
 
@@ -660,8 +669,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
 
       resuming_from_bg_err =
           error_handler_.IsDBStopped() ||
-          (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
-           cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
+          (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
+           bg_flush_args[0].flush_reason_ ==
+               FlushReason::kErrorRecoveryRetryFlush);
     }
 
     if (!resuming_from_bg_err) {
@@ -816,7 +826,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
 
 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
                                 const MutableCFOptions& mutable_cf_options,
-                                int job_id) {
+                                int job_id, FlushReason flush_reason) {
 #ifndef ROCKSDB_LITE
   if (immutable_db_options_.listeners.size() == 0U) {
     return;
@@ -849,7 +859,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
     info.triggered_writes_stop = triggered_writes_stop;
     info.smallest_seqno = file_meta->fd.smallest_seqno;
     info.largest_seqno = file_meta->fd.largest_seqno;
-    info.flush_reason = cfd->GetFlushReason();
+    info.flush_reason = flush_reason;
     for (auto listener : immutable_db_options_.listeners) {
       listener->OnFlushBegin(this, info);
     }
@@ -862,6 +872,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
   (void)file_meta;
   (void)mutable_cf_options;
   (void)job_id;
+  (void)flush_reason;
 #endif  // ROCKSDB_LITE
 }
 
@@ -2102,16 +2113,17 @@ Status DBImpl::RunManualCompaction(
 }
 
 void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
-                                  FlushRequest* req) {
+                                  FlushReason flush_reason, FlushRequest* req) {
   assert(req != nullptr);
-  req->reserve(cfds.size());
+  req->flush_reason = flush_reason;
+  req->cfd_to_max_mem_id_to_persist.reserve(cfds.size());
   for (const auto cfd : cfds) {
     if (nullptr == cfd) {
       // cfd may be null, see DBImpl::ScheduleFlushes
       continue;
     }
     uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
-    req->emplace_back(cfd, max_memtable_id);
+    req->cfd_to_max_mem_id_to_persist.emplace(cfd, max_memtable_id);
   }
 }
 
@@ -2169,7 +2181,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
     if (s.ok()) {
       if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
           !cached_recoverable_state_empty_.load()) {
-        FlushRequest req{{cfd, flush_memtable_id}};
+        FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};
         flush_reqs.emplace_back(std::move(req));
         memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
       }
@@ -2197,7 +2209,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                            "to avoid holding old logs",
                            cfd->GetName().c_str());
             s = SwitchMemtable(cfd_stats, &context);
-            FlushRequest req{{cfd_stats, flush_memtable_id}};
+            FlushRequest req{flush_reason, {{cfd_stats, flush_memtable_id}}};
             flush_reqs.emplace_back(std::move(req));
             memtable_ids_to_wait.emplace_back(
                 cfd_stats->imm()->GetLatestMemTableID());
@@ -2208,8 +2220,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
 
     if (s.ok() && !flush_reqs.empty()) {
       for (const auto& req : flush_reqs) {
-        assert(req.size() == 1);
-        ColumnFamilyData* loop_cfd = req[0].first;
+        assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
+        ColumnFamilyData* loop_cfd =
+            req.cfd_to_max_mem_id_to_persist.begin()->first;
         loop_cfd->imm()->FlushRequested();
       }
       // If the caller wants to wait for this flush to complete, it indicates
@@ -2218,13 +2231,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
       // Therefore, we increase the cfd's ref count.
       if (flush_options.wait) {
         for (const auto& req : flush_reqs) {
-          assert(req.size() == 1);
-          ColumnFamilyData* loop_cfd = req[0].first;
+          assert(req.cfd_to_max_mem_id_to_persist.size() == 1);
+          ColumnFamilyData* loop_cfd =
+              req.cfd_to_max_mem_id_to_persist.begin()->first;
           loop_cfd->Ref();
         }
       }
       for (const auto& req : flush_reqs) {
-        SchedulePendingFlush(req, flush_reason);
+        SchedulePendingFlush(req);
       }
       MaybeScheduleFlushOrCompaction();
     }
@@ -2243,8 +2257,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
     autovector<const uint64_t*> flush_memtable_ids;
     assert(flush_reqs.size() == memtable_ids_to_wait.size());
     for (size_t i = 0; i < flush_reqs.size(); ++i) {
-      assert(flush_reqs[i].size() == 1);
-      cfds.push_back(flush_reqs[i][0].first);
+      assert(flush_reqs[i].cfd_to_max_mem_id_to_persist.size() == 1);
+      cfds.push_back(flush_reqs[i].cfd_to_max_mem_id_to_persist.begin()->first);
       flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
     }
     s = WaitForFlushMemTables(
@@ -2341,8 +2355,8 @@ Status DBImpl::AtomicFlushMemTables(
           cfd->Ref();
         }
       }
-      GenerateFlushRequest(cfds, &flush_req);
-      SchedulePendingFlush(flush_req, flush_reason);
+      GenerateFlushRequest(cfds, flush_reason, &flush_req);
+      SchedulePendingFlush(flush_req);
       MaybeScheduleFlushOrCompaction();
     }
 
@@ -2357,7 +2371,7 @@ Status DBImpl::AtomicFlushMemTables(
   TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
   if (s.ok() && flush_options.wait) {
     autovector<const uint64_t*> flush_memtable_ids;
-    for (auto& iter : flush_req) {
+    for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
       flush_memtable_ids.push_back(&(iter.second));
     }
     s = WaitForFlushMemTables(
@@ -2704,9 +2718,9 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
   FlushRequest flush_req = flush_queue_.front();
   flush_queue_.pop_front();
   if (!immutable_db_options_.atomic_flush) {
-    assert(flush_req.size() == 1);
+    assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
   }
-  for (const auto& elem : flush_req) {
+  for (const auto& elem : flush_req.cfd_to_max_mem_id_to_persist) {
     if (!immutable_db_options_.atomic_flush) {
       ColumnFamilyData* cfd = elem.first;
       assert(cfd);
@@ -2714,7 +2728,6 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
       cfd->set_queued_for_flush(false);
     }
   }
-  // TODO: need to unset flush reason?
   return flush_req;
 }
 
@@ -2744,31 +2757,29 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
   return cfd;
 }
 
-void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
-                                  FlushReason flush_reason) {
+void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
   mutex_.AssertHeld();
-  if (flush_req.empty()) {
+  if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
     return;
   }
   if (!immutable_db_options_.atomic_flush) {
     // For the non-atomic flush case, we never schedule multiple column
     // families in the same flush request.
-    assert(flush_req.size() == 1);
-    ColumnFamilyData* cfd = flush_req[0].first;
+    assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
+    ColumnFamilyData* cfd =
+        flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
     assert(cfd);
 
     if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
       cfd->Ref();
       cfd->set_queued_for_flush(true);
-      cfd->SetFlushReason(flush_reason);
       ++unscheduled_flushes_;
       flush_queue_.push_back(flush_req);
     }
   } else {
-    for (auto& iter : flush_req) {
+    for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
       ColumnFamilyData* cfd = iter.first;
       cfd->Ref();
-      cfd->SetFlushReason(flush_reason);
     }
     ++unscheduled_flushes_;
     flush_queue_.push_back(flush_req);
@@ -2900,10 +2911,12 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
   while (!flush_queue_.empty()) {
     // This cfd is already referenced
     const FlushRequest& flush_req = PopFirstFromFlushQueue();
+    FlushReason flush_reason = flush_req.flush_reason;
     superversion_contexts.clear();
-    superversion_contexts.reserve(flush_req.size());
+    superversion_contexts.reserve(
+        flush_req.cfd_to_max_mem_id_to_persist.size());
 
-    for (const auto& iter : flush_req) {
+    for (const auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
       ColumnFamilyData* cfd = iter.first;
       if (cfd->GetMempurgeUsed()) {
         // If imm() contains silent memtables (e.g.: because
@@ -2919,7 +2932,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
       }
       superversion_contexts.emplace_back(SuperVersionContext(true));
       bg_flush_args.emplace_back(cfd, iter.second,
-                                 &(superversion_contexts.back()));
+                                 &(superversion_contexts.back()), flush_reason);
     }
     if (!bg_flush_args.empty()) {
       break;
@@ -2943,9 +2956,14 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
     status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
                                          job_context, log_buffer, thread_pri);
     TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
-    // All the CFDs in the FlushReq must have the same flush reason, so just
-    // grab the first one
-    *reason = bg_flush_args[0].cfd_->GetFlushReason();
+// All the CFD/bg_flush_arg in the FlushReq must have the same flush reason, so
+// just grab the first one
+#ifndef NDEBUG
+    for (const auto bg_flush_arg : bg_flush_args) {
+      assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_);
+    }
+#endif /* !NDEBUG */
+    *reason = bg_flush_args[0].flush_reason_;
     for (auto& arg : bg_flush_args) {
       ColumnFamilyData* cfd = arg.cfd_;
       if (cfd->UnrefAndTryDelete()) {
index 08407ba85e10f8419ee94cce01ccb1172da63963..ee8c9898931c8d463a4a6e87c10b05ed8e0ef66b 100644 (file)
@@ -1654,14 +1654,14 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
       cfd->imm()->FlushRequested();
       if (!immutable_db_options_.atomic_flush) {
         FlushRequest flush_req;
-        GenerateFlushRequest({cfd}, &flush_req);
-        SchedulePendingFlush(flush_req, FlushReason::kWalFull);
+        GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req);
+        SchedulePendingFlush(flush_req);
       }
     }
     if (immutable_db_options_.atomic_flush) {
       FlushRequest flush_req;
-      GenerateFlushRequest(cfds, &flush_req);
-      SchedulePendingFlush(flush_req, FlushReason::kWalFull);
+      GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req);
+      SchedulePendingFlush(flush_req);
     }
     MaybeScheduleFlushOrCompaction();
   }
@@ -1745,14 +1745,15 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
       cfd->imm()->FlushRequested();
       if (!immutable_db_options_.atomic_flush) {
         FlushRequest flush_req;
-        GenerateFlushRequest({cfd}, &flush_req);
-        SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
+        GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager,
+                             &flush_req);
+        SchedulePendingFlush(flush_req);
       }
     }
     if (immutable_db_options_.atomic_flush) {
       FlushRequest flush_req;
-      GenerateFlushRequest(cfds, &flush_req);
-      SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
+      GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req);
+      SchedulePendingFlush(flush_req);
     }
     MaybeScheduleFlushOrCompaction();
   }
@@ -2017,13 +2018,13 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
     if (immutable_db_options_.atomic_flush) {
       AssignAtomicFlushSeq(cfds);
       FlushRequest flush_req;
-      GenerateFlushRequest(cfds, &flush_req);
-      SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
+      GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req);
+      SchedulePendingFlush(flush_req);
     } else {
       for (auto* cfd : cfds) {
         FlushRequest flush_req;
-        GenerateFlushRequest({cfd}, &flush_req);
-        SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
+        GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
+        SchedulePendingFlush(flush_req);
       }
     }
     MaybeScheduleFlushOrCompaction();
index ac84da4cae72c11370af0fa57e0c1ddae49e77eb..e99497a1c6ab5257dbb8f4d6e79bbe311fb618c9 100644 (file)
@@ -91,7 +91,7 @@ FlushJob::FlushJob(
     std::vector<SequenceNumber> existing_snapshots,
     SequenceNumber earliest_write_conflict_snapshot,
     SnapshotChecker* snapshot_checker, JobContext* job_context,
-    LogBuffer* log_buffer, FSDirectory* db_directory,
+    FlushReason flush_reason, LogBuffer* log_buffer, FSDirectory* db_directory,
     FSDirectory* output_file_directory, CompressionType output_compression,
     Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
     const bool sync_output_directory, const bool write_manifest,
@@ -114,6 +114,7 @@ FlushJob::FlushJob(
       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
       snapshot_checker_(snapshot_checker),
       job_context_(job_context),
+      flush_reason_(flush_reason),
       log_buffer_(log_buffer),
       db_directory_(db_directory),
       output_file_directory_(output_file_directory),
@@ -245,9 +246,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
   }
   Status mempurge_s = Status::NotFound("No MemPurge.");
   if ((mempurge_threshold > 0.0) &&
-      (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
-      (!mems_.empty()) && MemPurgeDecider(mempurge_threshold) &&
-      !(db_options_.atomic_flush)) {
+      (flush_reason_ == FlushReason::kWriteBufferFull) && (!mems_.empty()) &&
+      MemPurgeDecider(mempurge_threshold) && !(db_options_.atomic_flush)) {
     cfd_->SetMempurgeUsed();
     mempurge_s = MemPurge();
     if (!mempurge_s.ok()) {
@@ -878,7 +878,7 @@ Status FlushJob::WriteLevel0Table() {
                          << total_num_deletes << "total_data_size"
                          << total_data_size << "memory_usage"
                          << total_memory_usage << "flush_reason"
-                         << GetFlushReasonString(cfd_->GetFlushReason());
+                         << GetFlushReasonString(flush_reason_);
 
     {
       ScopedArenaIterator iter(
@@ -1076,7 +1076,7 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
   info->smallest_seqno = meta_.fd.smallest_seqno;
   info->largest_seqno = meta_.fd.largest_seqno;
   info->table_properties = table_properties_;
-  info->flush_reason = cfd_->GetFlushReason();
+  info->flush_reason = flush_reason_;
   info->blob_compression_type = mutable_cf_options_.blob_compression_type;
 
   // Update BlobFilesInfo.
index 60c272aec3b7470021698f99006a35d265dd8be0..062ef299760b268a6c98f70f7a182b335ddad1b2 100644 (file)
@@ -67,8 +67,8 @@ class FlushJob {
            std::vector<SequenceNumber> existing_snapshots,
            SequenceNumber earliest_write_conflict_snapshot,
            SnapshotChecker* snapshot_checker, JobContext* job_context,
-           LogBuffer* log_buffer, FSDirectory* db_directory,
-           FSDirectory* output_file_directory,
+           FlushReason flush_reason, LogBuffer* log_buffer,
+           FSDirectory* db_directory, FSDirectory* output_file_directory,
            CompressionType output_compression, Statistics* stats,
            EventLogger* event_logger, bool measure_io_stats,
            const bool sync_output_directory, const bool write_manifest,
@@ -150,6 +150,7 @@ class FlushJob {
   SequenceNumber earliest_write_conflict_snapshot_;
   SnapshotChecker* snapshot_checker_;
   JobContext* job_context_;
+  FlushReason flush_reason_;
   LogBuffer* log_buffer_;
   FSDirectory* db_directory_;
   FSDirectory* output_file_directory_;
index f994b4e9b5e9d660bed19d7f7c060c2854560468..003a1a6570c189f8e6ec9653cc5e80c75adc699d 100644 (file)
@@ -164,15 +164,15 @@ TEST_F(FlushJobTest, Empty) {
   auto cfd = versions_->GetColumnFamilySet()->GetDefault();
   EventLogger event_logger(db_options_.info_log.get());
   SnapshotChecker* snapshot_checker = nullptr;  // not relavant
-  FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
-                     db_options_, *cfd->GetLatestMutableCFOptions(),
-                     std::numeric_limits<uint64_t>::max() /* memtable_id */,
-                     env_options_, versions_.get(), &mutex_, &shutting_down_,
-                     {}, kMaxSequenceNumber, snapshot_checker, &job_context,
-                     nullptr, nullptr, nullptr, kNoCompression, nullptr,
-                     &event_logger, false, true /* sync_output_directory */,
-                     true /* write_manifest */, Env::Priority::USER,
-                     nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
+  FlushJob flush_job(
+      dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
+      *cfd->GetLatestMutableCFOptions(),
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
+      versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
+      snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+      nullptr, kNoCompression, nullptr, &event_logger, false,
+      true /* sync_output_directory */, true /* write_manifest */,
+      Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
   {
     InstrumentedMutexLock l(&mutex_);
     flush_job.PickMemTable();
@@ -255,9 +255,9 @@ TEST_F(FlushJobTest, NonEmpty) {
       *cfd->GetLatestMutableCFOptions(),
       std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
       versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
-      snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
-      db_options_.statistics.get(), &event_logger, true,
-      true /* sync_output_directory */, true /* write_manifest */,
+      snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+      nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+      true, true /* sync_output_directory */, true /* write_manifest */,
       Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
 
   HistogramData hist;
@@ -318,9 +318,9 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
       dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
       *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
       versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
-      snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
-      db_options_.statistics.get(), &event_logger, true,
-      true /* sync_output_directory */, true /* write_manifest */,
+      snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+      nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+      true, true /* sync_output_directory */, true /* write_manifest */,
       Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
   HistogramData hist;
   FileMetaData file_meta;
@@ -391,8 +391,8 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
         dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
         memtable_ids[k], env_options_, versions_.get(), &mutex_,
         &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
-        &job_context, nullptr, nullptr, nullptr, kNoCompression,
-        db_options_.statistics.get(), &event_logger, true,
+        &job_context, FlushReason::kTest, nullptr, nullptr, nullptr,
+        kNoCompression, db_options_.statistics.get(), &event_logger, true,
         false /* sync_output_directory */, false /* write_manifest */,
         Env::Priority::USER, nullptr /*IOTracer*/,
         empty_seqno_to_time_mapping_));
@@ -520,9 +520,9 @@ TEST_F(FlushJobTest, Snapshots) {
       *cfd->GetLatestMutableCFOptions(),
       std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
       versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
-      snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
-      db_options_.statistics.get(), &event_logger, true,
-      true /* sync_output_directory */, true /* write_manifest */,
+      snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+      nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+      true, true /* sync_output_directory */, true /* write_manifest */,
       Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
   mutex_.Lock();
   flush_job.PickMemTable();
@@ -576,9 +576,9 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) {
       dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
       *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
       versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
-      snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
-      db_options_.statistics.get(), &event_logger, true,
-      true /* sync_output_directory */, true /* write_manifest */,
+      snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+      nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+      true, true /* sync_output_directory */, true /* write_manifest */,
       Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_);
 
   // When the state from WriteController is normal.
@@ -656,9 +656,9 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) {
       dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
       std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
       versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
-      snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
-      db_options_.statistics.get(), &event_logger, true,
-      true /* sync_output_directory */, true /* write_manifest */,
+      snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+      nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+      true, true /* sync_output_directory */, true /* write_manifest */,
       Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_,
       /*db_id=*/"",
       /*db_session_id=*/"", full_history_ts_low);
@@ -709,9 +709,9 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) {
       dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
       std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
       versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
-      snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
-      db_options_.statistics.get(), &event_logger, true,
-      true /* sync_output_directory */, true /* write_manifest */,
+      snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr,
+      nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
+      true, true /* sync_output_directory */, true /* write_manifest */,
       Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_,
       /*db_id=*/"",
       /*db_session_id=*/"", full_history_ts_low);