]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix NotifyOnFlushCompleted() for atomic flush (#8585)
authorYanqin Jin <yanqin@fb.com>
Tue, 3 Aug 2021 20:30:05 +0000 (13:30 -0700)
committerLevi Tamasi <ltamasi@fb.com>
Wed, 4 Aug 2021 18:24:41 +0000 (11:24 -0700)
Summary:
PR https://github.com/facebook/rocksdb/issues/5908 added `flush_jobs_info_` to `FlushJob` to make sure
`OnFlushCompleted()` is called after committing flush results to
MANIFEST. However, `flush_jobs_info_` is not updated in atomic
flush, causing `NotifyOnFlushCompleted()` to skip `OnFlushCompleted()`.

This PR fixes this, in a similar way to https://github.com/facebook/rocksdb/issues/5908 that handles regular flush.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8585

Test Plan: make check

Reviewed By: jay-zhuang

Differential Revision: D29913720

Pulled By: riversand963

fbshipit-source-id: 4ff023c98372fa2c93188d4a5c8a4e9ffa0f4dda

db/db_impl/db_impl_compaction_flush.cc
db/flush_job_test.cc
db/listener_test.cc
db/memtable_list.cc
db/memtable_list.h
db/memtable_list_test.cc

index eba5354ca0968a5846fb5f2927c761ace6a0fa4c..563552d623e8fce04df4d675c0cb659d32ca793c 100644 (file)
@@ -590,6 +590,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
     autovector<const autovector<MemTable*>*> mems_list;
     autovector<const MutableCFOptions*> mutable_cf_options_list;
     autovector<FileMetaData*> tmp_file_meta;
+    autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+        committed_flush_jobs_info;
     for (int i = 0; i != num_cfs; ++i) {
       const auto& mems = jobs[i]->GetMemTables();
       if (!cfds[i]->IsDropped() && !mems.empty()) {
@@ -597,13 +599,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
         mems_list.emplace_back(&mems);
         mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
         tmp_file_meta.emplace_back(&file_meta[i]);
+#ifndef ROCKSDB_LITE
+        committed_flush_jobs_info.emplace_back(
+            jobs[i]->GetCommittedFlushJobsInfo());
+#endif  //! ROCKSDB_LITE
       }
     }
 
     s = InstallMemtableAtomicFlushResults(
         nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
         versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
-        &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
+        committed_flush_jobs_info, &job_context->memtables_to_free,
+        directories_.GetDbDir(), log_buffer);
   }
 
   if (s.ok()) {
index 2366da201e1899ace7ff2a8f5cbe2f9daaf1aecb..6c5baec3daa21b9441356039e4693e66631b8e82 100644 (file)
@@ -418,12 +418,19 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
   for (auto cfd : all_cfds) {
     mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
   }
+  autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+      committed_flush_jobs_info;
+#ifndef ROCKSDB_LITE
+  for (auto& job : flush_jobs) {
+    committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
+  }
+#endif  //! ROCKSDB_LITE
 
   Status s = InstallMemtableAtomicFlushResults(
       nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
       versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
-      &job_context.memtables_to_free, nullptr /* db_directory */,
-      nullptr /* log_buffer */);
+      committed_flush_jobs_info, &job_context.memtables_to_free,
+      nullptr /* db_directory */, nullptr /* log_buffer */);
   ASSERT_OK(s);
 
   mutex_.Unlock();
index 7c6eb9fe05eddfd3dea1223f4f559d26e07caacc..82a1b3e2b6315bb84e845514b50433b4156b030f 100644 (file)
@@ -356,32 +356,38 @@ TEST_F(EventListenerTest, MultiCF) {
 #ifdef ROCKSDB_USING_THREAD_STATUS
   options.enable_thread_tracking = true;
 #endif  // ROCKSDB_USING_THREAD_STATUS
-  TestFlushListener* listener = new TestFlushListener(options.env, this);
-  options.listeners.emplace_back(listener);
-  options.table_properties_collector_factories.push_back(
-      std::make_shared<TestPropertiesCollectorFactory>());
-  std::vector<std::string> cf_names = {
-      "pikachu", "ilya", "muromec", "dobrynia",
-      "nikitich", "alyosha", "popovich"};
-  CreateAndReopenWithCF(cf_names, options);
-
-  ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
-  ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
-  ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
-  ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
-  ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
-  ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
-  ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
-  for (int i = 1; i < 8; ++i) {
-    ASSERT_OK(Flush(i));
-    ASSERT_EQ(listener->flushed_dbs_.size(), i);
-    ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
-  }
+  for (auto atomic_flush : {false, true}) {
+    options.atomic_flush = atomic_flush;
+    options.create_if_missing = true;
+    DestroyAndReopen(options);
+    TestFlushListener* listener = new TestFlushListener(options.env, this);
+    options.listeners.emplace_back(listener);
+    options.table_properties_collector_factories.push_back(
+        std::make_shared<TestPropertiesCollectorFactory>());
+    std::vector<std::string> cf_names = {"pikachu",  "ilya",     "muromec",
+                                         "dobrynia", "nikitich", "alyosha",
+                                         "popovich"};
+    CreateAndReopenWithCF(cf_names, options);
+
+    ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
+    ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
+    ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
+    ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
+    ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
+    ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
+    ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
+    for (int i = 1; i < 8; ++i) {
+      ASSERT_OK(Flush(i));
+      ASSERT_EQ(listener->flushed_dbs_.size(), i);
+      ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
+    }
 
-  // make sure callback functions are called in the right order
-  for (size_t i = 0; i < cf_names.size(); i++) {
-    ASSERT_EQ(listener->flushed_dbs_[i], db_);
-    ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
+    // make sure callback functions are called in the right order
+    for (size_t i = 0; i < cf_names.size(); i++) {
+      ASSERT_EQ(listener->flushed_dbs_[i], db_);
+      ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
+    }
+    Close();
   }
 }
 
index eb56f205b2a5e6e8a7902b88fa6728ff99751214..845c33a90d44c1001768753157b2caa9af14ef11 100644 (file)
@@ -736,6 +736,8 @@ Status InstallMemtableAtomicFlushResults(
     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
     LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
     const autovector<FileMetaData*>& file_metas,
+    const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+        committed_flush_jobs_info,
     autovector<MemTable*>* to_delete, FSDirectory* db_directory,
     LogBuffer* log_buffer) {
   AutoThreadOperationStageUpdater stage_updater(
@@ -765,6 +767,17 @@ Status InstallMemtableAtomicFlushResults(
       (*mems_list[k])[i]->SetFlushCompleted(true);
       (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
     }
+#ifndef ROCKSDB_LITE
+    if (committed_flush_jobs_info[k]) {
+      assert(!mems_list[k]->empty());
+      assert((*mems_list[k])[0]);
+      std::unique_ptr<FlushJobInfo> flush_job_info =
+          (*mems_list[k])[0]->ReleaseFlushJobInfo();
+      committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
+    }
+#else   //! ROCKSDB_LITE
+    (void)committed_flush_jobs_info;
+#endif  // ROCKSDB_LITE
   }
 
   Status s;
index 88d9ad0178bfadef01cc674fca88fe5d5f18009a..572defb0e8bba5fdd2d1dfcfb991c00fed6bbaf5 100644 (file)
@@ -140,6 +140,8 @@ class MemTableListVersion {
       const autovector<const autovector<MemTable*>*>& mems_list,
       VersionSet* vset, LogsWithPrepTracker* prep_tracker,
       InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
+      const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+          committed_flush_jobs_info,
       autovector<MemTable*>* to_delete, FSDirectory* db_directory,
       LogBuffer* log_buffer);
 
@@ -402,6 +404,8 @@ class MemTableList {
       const autovector<const autovector<MemTable*>*>& mems_list,
       VersionSet* vset, LogsWithPrepTracker* prep_tracker,
       InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
+      const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+          committed_flush_jobs_info,
       autovector<MemTable*>* to_delete, FSDirectory* db_directory,
       LogBuffer* log_buffer);
 
@@ -452,6 +456,8 @@ extern Status InstallMemtableAtomicFlushResults(
     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
     LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
     const autovector<FileMetaData*>& file_meta,
+    const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+        committed_flush_jobs_info,
     autovector<MemTable*>* to_delete, FSDirectory* db_directory,
     LogBuffer* log_buffer);
 }  // namespace ROCKSDB_NAMESPACE
index 165471b6bbb7407a2f9aec44bd7960ec1500be35..18b5f57e9f8b558d96be0d99ecc3a445a648eddf 100644 (file)
@@ -182,12 +182,21 @@ class MemTableListTest : public testing::Test {
     for (auto& meta : file_metas) {
       file_meta_ptrs.push_back(&meta);
     }
+    std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
+        committed_flush_jobs_info_storage(cf_ids.size());
+    autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+        committed_flush_jobs_info;
+    for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
+      committed_flush_jobs_info.push_back(
+          &committed_flush_jobs_info_storage[i]);
+    }
+
     InstrumentedMutex mutex;
     InstrumentedMutexLock l(&mutex);
     return InstallMemtableAtomicFlushResults(
         &lists, cfds, mutable_cf_options_list, mems_list, &versions,
-        nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr,
-        &log_buffer);
+        nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
+        committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
   }
 };