]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Use correct FileMeta for atomic flush result install (#4932)
authorYanqin Jin <yanqin@fb.com>
Thu, 31 Jan 2019 22:28:53 +0000 (14:28 -0800)
committerYanqin Jin <yanqin@fb.com>
Thu, 31 Jan 2019 23:06:26 +0000 (15:06 -0800)
Summary:
1. this commit fixes our handling of a combination of two separate edge
cases. If a flush job does not pick any memtable to flush (because another
flush job has already picked the same memtables), and the column family
assigned to the flush job is dropped right before RocksDB calls
rocksdb::InstallMemtableAtomicFlushResults, our original code passes
a FileMetaData object whose file number is 0, failing the assertion in
rocksdb::InstallMemtableAtomicFlushResults (assert(m->GetFileNumber() > 0)).
2. Also piggyback a small change: since we already create a local copy of column family's mutable CF options to eliminate potential race condition with `SetOptions` call, we might as well use the local copy in other function calls in the same scope.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4932

Differential Revision: D13901322

Pulled By: riversand963

fbshipit-source-id: b936580af7c127ea0c6c19ea10cd5fcede9fb0f9

db/db_impl_compaction_flush.cc
db/flush_job_test.cc
db/memtable_list.cc
db/memtable_list.h
db/memtable_list_test.cc

index 5847f05dd3e46634486a25c8ac419f765eb1eb4c..a42e60f855a80e5703689de0282cbfc5b085e46e 100644 (file)
@@ -310,21 +310,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
     jobs.back().PickMemTable();
   }
 
-  autovector<FileMetaData> file_meta;
+  std::vector<FileMetaData> file_meta(num_cfs);
   Status s;
   assert(num_cfs == static_cast<int>(jobs.size()));
 
-  for (int i = 0; i != num_cfs; ++i) {
-    file_meta.emplace_back();
-
 #ifndef ROCKSDB_LITE
-    const MutableCFOptions& mutable_cf_options =
-        *cfds[i]->GetLatestMutableCFOptions();
+  for (int i = 0; i != num_cfs; ++i) {
+    const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
     // may temporarily unlock and lock the mutex.
     NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
                        job_context->job_id, jobs[i].GetTableProperties());
-#endif /* !ROCKSDB_LITE */
   }
+#endif /* !ROCKSDB_LITE */
 
   if (logfile_number_ > 0) {
     // TODO (yanqin) investigate whether we should sync the closed logs for
@@ -428,19 +425,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
     autovector<ColumnFamilyData*> tmp_cfds;
     autovector<const autovector<MemTable*>*> mems_list;
     autovector<const MutableCFOptions*> mutable_cf_options_list;
+    autovector<FileMetaData*> tmp_file_meta;
     for (int i = 0; i != num_cfs; ++i) {
       const auto& mems = jobs[i].GetMemTables();
       if (!cfds[i]->IsDropped() && !mems.empty()) {
         tmp_cfds.emplace_back(cfds[i]);
         mems_list.emplace_back(&mems);
         mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
+        tmp_file_meta.emplace_back(&file_meta[i]);
       }
     }
 
     s = InstallMemtableAtomicFlushResults(
         nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
-        versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free,
-        directories_.GetDbDir(), log_buffer);
+        versions_.get(), &mutex_, tmp_file_meta,
+        &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
   }
 
   if (s.ok() || s.IsShutdownInProgress()) {
@@ -452,7 +451,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
       }
       InstallSuperVersionAndScheduleWork(cfds[i],
                                          &job_context->superversion_contexts[i],
-                                         *cfds[i]->GetLatestMutableCFOptions());
+                                         all_mutable_cf_options[i]);
       VersionStorageInfo::LevelSummaryStorage tmp;
       ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
                        cfds[i]->GetName().c_str(),
@@ -468,8 +467,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
       if (cfds[i]->IsDropped()) {
         continue;
       }
-      NotifyOnFlushCompleted(cfds[i], &file_meta[i],
-                             *cfds[i]->GetLatestMutableCFOptions(),
+      NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
                              job_context->job_id, jobs[i].GetTableProperties());
       if (sfm) {
         std::string file_path = MakeTableFileName(
index 5ac5f2f9355f7b48a9b469a243468a6b3c4a80d6..1f7bc7b845b47bce30702b6f0cd47dc27c57033d 100644 (file)
@@ -308,7 +308,9 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
     k++;
   }
   HistogramData hist;
-  autovector<FileMetaData> file_metas;
+  std::vector<FileMetaData> file_metas;
+  // Call reserve to avoid auto-resizing
+  file_metas.reserve(flush_jobs.size());
   mutex_.Lock();
   for (auto& job : flush_jobs) {
     job.PickMemTable();
@@ -319,6 +321,10 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
     ASSERT_OK(job.Run(nullptr /**/, &meta));
     file_metas.emplace_back(meta);
   }
+  autovector<FileMetaData*> file_meta_ptrs;
+  for (auto& meta : file_metas) {
+    file_meta_ptrs.push_back(&meta);
+  }
   autovector<const autovector<MemTable*>*> mems_list;
   for (size_t i = 0; i != all_cfds.size(); ++i) {
     const auto& mems = flush_jobs[i].GetMemTables();
@@ -331,7 +337,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
 
   Status s = InstallMemtableAtomicFlushResults(
       nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
-      versions_.get(), &mutex_, file_metas, &job_context.memtables_to_free,
+      versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
       nullptr /* db_directory */, nullptr /* log_buffer */);
   ASSERT_OK(s);
 
index 459d392d5644d9ddcfc393dc0400d2b50761a0da..9397dbc7e0042408f90dc8bb52bfd566a0c3df93 100644 (file)
@@ -533,7 +533,7 @@ Status InstallMemtableAtomicFlushResults(
     const autovector<ColumnFamilyData*>& cfds,
     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
-    InstrumentedMutex* mu, const autovector<FileMetaData>& file_metas,
+    InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
     autovector<MemTable*>* to_delete, Directory* db_directory,
     LogBuffer* log_buffer) {
   AutoThreadOperationStageUpdater stage_updater(
@@ -553,10 +553,11 @@ Status InstallMemtableAtomicFlushResults(
       assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
     }
 #endif
+    assert(nullptr != file_metas[k]);
     for (size_t i = 0; i != mems_list[k]->size(); ++i) {
       assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
       (*mems_list[k])[i]->SetFlushCompleted(true);
-      (*mems_list[k])[i]->SetFileNumber(file_metas[k].fd.GetNumber());
+      (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
     }
   }
 
index be3f93562da7640fda4d3f557925472858313d5e..b56ad4932c4ec2841fc7248d99af6b4bf3af7c33 100644 (file)
@@ -123,7 +123,7 @@ class MemTableListVersion {
       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
       const autovector<const autovector<MemTable*>*>& mems_list,
       VersionSet* vset, InstrumentedMutex* mu,
-      const autovector<FileMetaData>& file_meta,
+      const autovector<FileMetaData*>& file_meta,
       autovector<MemTable*>* to_delete, Directory* db_directory,
       LogBuffer* log_buffer);
 
@@ -301,7 +301,7 @@ class MemTableList {
       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
       const autovector<const autovector<MemTable*>*>& mems_list,
       VersionSet* vset, InstrumentedMutex* mu,
-      const autovector<FileMetaData>& file_meta,
+      const autovector<FileMetaData*>& file_meta,
       autovector<MemTable*>* to_delete, Directory* db_directory,
       LogBuffer* log_buffer);
 
@@ -337,7 +337,7 @@ extern Status InstallMemtableAtomicFlushResults(
     const autovector<ColumnFamilyData*>& cfds,
     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
-    InstrumentedMutex* mu, const autovector<FileMetaData>& file_meta,
+    InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
     autovector<MemTable*>* to_delete, Directory* db_directory,
     LogBuffer* log_buffer);
 }  // namespace rocksdb
index d67eed9fa15c0ea39e304637a8a41726383ca72e..f0f4b0bb0cb964b75589fb75872132a50d383ba8 100644 (file)
@@ -161,18 +161,23 @@ class MemTableListTest : public testing::Test {
       cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
       EXPECT_NE(nullptr, cfds[i]);
     }
-    autovector<FileMetaData> file_metas;
+    std::vector<FileMetaData> file_metas;
+    file_metas.reserve(cf_ids.size());
     for (size_t i = 0; i != cf_ids.size(); ++i) {
       FileMetaData meta;
       uint64_t file_num = file_number.fetch_add(1);
       meta.fd = FileDescriptor(file_num, 0, 0);
       file_metas.emplace_back(meta);
     }
+    autovector<FileMetaData*> file_meta_ptrs;
+    for (auto& meta : file_metas) {
+      file_meta_ptrs.push_back(&meta);
+    }
     InstrumentedMutex mutex;
     InstrumentedMutexLock l(&mutex);
     return InstallMemtableAtomicFlushResults(
         &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
-        file_metas, to_delete, nullptr, &log_buffer);
+        file_meta_ptrs, to_delete, nullptr, &log_buffer);
   }
 };