]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add condition on NotifyOnFlushComplete that FlushJob was not mempurge. Add event...
authorBaptiste Lemaire <blemaire@fb.com>
Thu, 19 Aug 2021 00:39:00 +0000 (17:39 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 19 Aug 2021 00:40:01 +0000 (17:40 -0700)
Summary:
Previously, when a `FlushJob` was redirected to a MemPurge, the function `DBImpl::NotifyOnFlushComplete` was called, which created a series of issues because the JobInfo was not correctly collected from the memtables.
This diff aims at correcting these two issues (`FlushJobInfo` collection in `FlushJob::MemPurge` , no call to `DBImpl::NotifyOnFlushComplete` after successful mempurge).
Event listeners were added to the unit tests to handle these situations.
Surprisingly none of the crashtests caught this issue, I will try to add event listeners to crash tests in the future.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8672

Reviewed By: akankshamahajan15

Differential Revision: D30383109

Pulled By: bjlemaire

fbshipit-source-id: 35a8d4295886923ee4049a6447f00022cb221c73

db/db_flush_test.cc
db/db_impl/db_impl_compaction_flush.cc
db/flush_job.cc
db/flush_job.h

index 824dc9e550e0f5ac544d4fc830d64f01fefe4f55..41a9a983edc14001417f2ecdc488300100f1650e 100644 (file)
@@ -663,6 +663,87 @@ TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) {
   Close();
 }
 
+#ifndef ROCKSDB_LITE
+// This simple Listener can only handle one flush at a time.
+class TestFlushListener : public EventListener {
+ public:
+  TestFlushListener(Env* env, DBFlushTest* test)
+      : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
+    db_closed = false;
+  }
+
+  ~TestFlushListener() override {
+    prev_fc_info_.status.PermitUncheckedError();  // Ignore the status
+  }
+  void OnTableFileCreated(const TableFileCreationInfo& info) override {
+    // remember the info for later checking the FlushJobInfo.
+    prev_fc_info_ = info;
+    ASSERT_GT(info.db_name.size(), 0U);
+    ASSERT_GT(info.cf_name.size(), 0U);
+    ASSERT_GT(info.file_path.size(), 0U);
+    ASSERT_GT(info.job_id, 0);
+    ASSERT_GT(info.table_properties.data_size, 0U);
+    ASSERT_GT(info.table_properties.raw_key_size, 0U);
+    ASSERT_GT(info.table_properties.raw_value_size, 0U);
+    ASSERT_GT(info.table_properties.num_data_blocks, 0U);
+    ASSERT_GT(info.table_properties.num_entries, 0U);
+    ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
+    ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
+  }
+
+  void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
+    flushed_dbs_.push_back(db);
+    flushed_column_family_names_.push_back(info.cf_name);
+    if (info.triggered_writes_slowdown) {
+      slowdown_count++;
+    }
+    if (info.triggered_writes_stop) {
+      stop_count++;
+    }
+    // verify whether the previously created file matches the flushed file.
+    ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
+    ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
+    ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
+    ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
+    ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
+
+    // Note: the following chunk relies on the notification pertaining to the
+    // database pointed to by DBTestBase::db_, and is thus bypassed when
+    // that assumption does not hold (see the test case MultiDBMultiListeners
+    // below).
+    ASSERT_TRUE(test_);
+    if (db == test_->db_) {
+      std::vector<std::vector<FileMetaData>> files_by_level;
+      test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(),
+                                             &files_by_level);
+
+      ASSERT_FALSE(files_by_level.empty());
+      auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
+                             [&](const FileMetaData& meta) {
+                               return meta.fd.GetNumber() == info.file_number;
+                             });
+      ASSERT_NE(it, files_by_level[0].end());
+      ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
+    }
+
+    ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
+    ASSERT_GT(info.thread_id, 0U);
+  }
+
+  std::vector<std::string> flushed_column_family_names_;
+  std::vector<DB*> flushed_dbs_;
+  int slowdown_count;
+  int stop_count;
+  bool db_closing;
+  std::atomic_bool db_closed;
+  TableFileCreationInfo prev_fc_info_;
+
+ protected:
+  Env* env_;
+  DBFlushTest* test_;
+};
+#endif  // !ROCKSDB_LITE
+
 TEST_F(DBFlushTest, MemPurgeBasic) {
   Options options = CurrentOptions();
 
@@ -695,8 +776,11 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
   // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
-  options.experimental_mempurge_threshold =
-      1.0;  // std::numeric_limits<double>::max();
+  options.experimental_mempurge_threshold = 1.0;
+#ifndef ROCKSDB_LITE
+  TestFlushListener* listener = new TestFlushListener(options.env, this);
+  options.listeners.emplace_back(listener);
+#endif  // !ROCKSDB_LITE
   ASSERT_OK(TryReopen(options));
   uint32_t mempurge_count = 0;
   uint32_t sst_count = 0;
@@ -839,12 +923,15 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
   options.compression = kNoCompression;
   options.inplace_update_support = false;
   options.allow_concurrent_memtable_write = true;
-
+#ifndef ROCKSDB_LITE
+  TestFlushListener* listener = new TestFlushListener(options.env, this);
+  options.listeners.emplace_back(listener);
+#endif  // !ROCKSDB_LITE
   // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
-  options.experimental_mempurge_threshold =
-      1.0;  // std::numeric_limits<double>::max();
+  options.experimental_mempurge_threshold = 1.0;
+
   ASSERT_OK(TryReopen(options));
 
   uint32_t mempurge_count = 0;
@@ -1037,7 +1124,10 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
   options.compression = kNoCompression;
   options.inplace_update_support = false;
   options.allow_concurrent_memtable_write = true;
-
+#ifndef ROCKSDB_LITE
+  TestFlushListener* listener = new TestFlushListener(options.env, this);
+  options.listeners.emplace_back(listener);
+#endif  // !ROCKSDB_LITE
   // Create a ConditionalUpdate compaction filter
   // that will update all the values of the KV pairs
   // where the keys are "lower" than KEY4.
@@ -1047,8 +1137,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
   // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
-  options.experimental_mempurge_threshold =
-      1.0;  // std::numeric_limits<double>::max();
+  options.experimental_mempurge_threshold = 1.0;
+
   ASSERT_OK(TryReopen(options));
 
   uint32_t mempurge_count = 0;
@@ -1123,8 +1213,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
   // Enforce size of a single MemTable to 128KB.
   options.write_buffer_size = 128 << 10;
   // Activate the MemPurge prototype.
-  options.experimental_mempurge_threshold =
-      1.0;  // std::numeric_limits<double>::max();
+  options.experimental_mempurge_threshold = 1.0;
+
   ASSERT_OK(TryReopen(options));
 
   const size_t KVSIZE = 10;
index 7ec42c1fd65aa840fd1bdb76895780cfef757f96..df849d5191ae77f9421d94fe04899989544773b9 100644 (file)
@@ -7,6 +7,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 #include <cinttypes>
+#include <deque>
 
 #include "db/builder.h"
 #include "db/db_impl/db_impl.h"
@@ -198,7 +199,7 @@ Status DBImpl::FlushMemTableToOutputFile(
     need_cancel = true;
   }
   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
-
+  bool switched_to_mempurge = false;
   // Within flush_job.Run, rocksdb may call event listener to notify
   // file creation and deletion.
   //
@@ -206,7 +207,8 @@ Status DBImpl::FlushMemTableToOutputFile(
   // and EventListener callback will be called when the db_mutex
   // is unlocked by the current thread.
   if (s.ok()) {
-    s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
+    s = flush_job.Run(&logs_with_prep_tracker_, &file_meta,
+                      &switched_to_mempurge);
     need_cancel = false;
   }
 
@@ -282,7 +284,9 @@ Status DBImpl::FlushMemTableToOutputFile(
     // from never needing it or ignoring the flush job status
     io_s.PermitUncheckedError();
   }
-  if (s.ok()) {
+  // If flush ran smoothly and no mempurge happened
+  // install new SST file path.
+  if (s.ok() && (!switched_to_mempurge)) {
 #ifndef ROCKSDB_LITE
     // may temporarily unlock and lock the mutex.
     NotifyOnFlushCompleted(cfd, mutable_cf_options,
@@ -411,6 +415,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
   }
 
   std::vector<FileMetaData> file_meta(num_cfs);
+  // Use of deque<bool> because vector<bool>
+  // is specific and doesn't allow &v[i].
+  std::deque<bool> switched_to_mempurge(num_cfs, false);
   Status s;
   IOStatus log_io_s = IOStatus::OK();
   assert(num_cfs == static_cast<int>(jobs.size()));
@@ -460,10 +467,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
   }
 
   if (s.ok()) {
+    assert(switched_to_mempurge.size() ==
+           static_cast<long unsigned int>(num_cfs));
     // TODO (yanqin): parallelize jobs with threads.
     for (int i = 1; i != num_cfs; ++i) {
       exec_status[i].second =
-          jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
+          jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i],
+                       &(switched_to_mempurge.at(i)));
       exec_status[i].first = true;
       io_status[i] = jobs[i]->io_status();
     }
@@ -475,8 +485,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
     }
     assert(exec_status.size() > 0);
     assert(!file_meta.empty());
-    exec_status[0].second =
-        jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
+    exec_status[0].second = jobs[0]->Run(
+        &logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */,
+        switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0)));
     exec_status[0].first = true;
     io_status[0] = jobs[0]->io_status();
 
@@ -656,6 +667,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
         immutable_db_options_.sst_file_manager.get());
     assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
     for (int i = 0; s.ok() && i != num_cfs; ++i) {
+      // If mempurge happened instead of Flush,
+      // no NotifyOnFlushCompleted call (no SST file created).
+      if (switched_to_mempurge[i]) {
+        continue;
+      }
       if (cfds[i]->IsDropped()) {
         continue;
       }
index 43dc87d68b20807101855c6856e4a923ce57142b..d520b709efc943268a3bc9d3cd6b97d6b768f119 100644 (file)
@@ -196,8 +196,8 @@ void FlushJob::PickMemTable() {
   base_->Ref();  // it is likely that we do not need this reference
 }
 
-Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
-                     FileMetaData* file_meta) {
+Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
+                     bool* switched_to_mempurge) {
   TEST_SYNC_POINT("FlushJob::Start");
   db_mutex_->AssertHeld();
   assert(pick_memtable_called);
@@ -244,6 +244,16 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
         ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
                        mempurge_s.ToString().c_str());
       }
+    } else {
+      if (switched_to_mempurge) {
+        *switched_to_mempurge = true;
+      } else {
+        // The mempurge process was successful, but no switch_to_mempurge
+        // pointer provided so no way to propagate the state of flush job.
+        ROCKS_LOG_WARN(db_options_.info_log,
+                       "Mempurge process succeeded"
+                       "but no 'switched_to_mempurge' ptr provided.\n");
+      }
     }
   }
   Status s;
@@ -561,6 +571,12 @@ Status FlushJob::MemPurge() {
         // we do not call SchedulePendingFlush().
         cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
         new_mem->Ref();
+#ifndef ROCKSDB_LITE
+        // Piggyback FlushJobInfo on the first flushed memtable.
+        db_mutex_->AssertHeld();
+        meta_.fd.file_size = 0;
+        mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
+#endif  // !ROCKSDB_LITE
         db_mutex_->Unlock();
       } else {
         s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
index 81b4e86dd12a30147c7960130dabf61ba0c4ebd8..49d57ef794ba50c414bfce548e0c15434f8fd2a4 100644 (file)
@@ -83,7 +83,8 @@ class FlushJob {
   // Once PickMemTable() is called, either Run() or Cancel() has to be called.
   void PickMemTable();
   Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
-             FileMetaData* file_meta = nullptr);
+             FileMetaData* file_meta = nullptr,
+             bool* switched_to_mempurge = nullptr);
   void Cancel();
   const autovector<MemTable*>& GetMemTables() const { return mems_; }