]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix Flush() keep waiting after flush finish
authorYi Wu <yiwu@fb.com>
Fri, 19 Jan 2018 01:32:50 +0000 (17:32 -0800)
committerYi Wu <yiwu@fb.com>
Fri, 19 Jan 2018 01:52:02 +0000 (17:52 -0800)
Summary:
Flush() call could be waiting indefinitely if min_write_buffer_number_to_merge is used. Consider the sequence:
1. User call Flush() with flush_options.wait = true
2. The manual flush started in the background
3. New memtable become immutable because of writes. The new memtable will not trigger flush if min_write_buffer_number_to_merge is not reached.
4. The manual flush finish.

Because of the new memtable created at step 3 not being flush, previous logic of WaitForFlushMemTable() keep waiting, despite the memtables it intent to flush has been flushed.

Here instead of checking if there are any more memtables to flush, WaitForFlushMemTable() also check the id of the earliest memtable. If the id is larger than that of latest memtable at the time flush was initiated, it means all the memtable at the time of flush start has all been flush.
Closes https://github.com/facebook/rocksdb/pull/3378

Differential Revision: D6746789

Pulled By: yiwu-arbug

fbshipit-source-id: 35e698f71c7f90b06337a93e6825f4ea3b619bfa

HISTORY.md
db/column_family.cc
db/column_family.h
db/db_flush_test.cc
db/db_impl.h
db/db_impl_compaction_flush.cc
db/memtable.h
db/memtable_list.h

index 6cdf01d0c4e935dedad05b5d0263eaadee9e6d0a..2f8383de614b856f0ccc5b53ed285c95a71bae52 100644 (file)
@@ -1,5 +1,7 @@
 # Rocksdb Change Log
-## Unreleased
+## 5.10.1 (01/18/2018)
+### Bug Fixes
+* Fix DB::Flush() keep waiting after flush finish under certain condition.
 
 ## 5.10.0 (12/11/2017)
 ### Public API Change
index 81b63262ec2cad81eb0eecafe18ad92da1af8475..c32e97f067a16e3586fb75805a8f9e6a342026fa 100644 (file)
@@ -385,7 +385,8 @@ ColumnFamilyData::ColumnFamilyData(
       pending_flush_(false),
       pending_compaction_(false),
       prev_compaction_needed_bytes_(0),
-      allow_2pc_(db_options.allow_2pc) {
+      allow_2pc_(db_options.allow_2pc),
+      last_memtable_id_(0) {
   Ref();
 
   // Convert user defined table properties collector factories to internal ones.
index 9a125aa1cf3cf42ed63ce96203a4cfef530f4188..ce1fd47385adc80a93e6bbf15c675ca7944c95bd 100644 (file)
@@ -239,7 +239,11 @@ class ColumnFamilyData {
   void SetCurrent(Version* _current);
   uint64_t GetNumLiveVersions() const;  // REQUIRE: DB mutex held
   uint64_t GetTotalSstFilesSize() const;  // REQUIRE: DB mutex held
-  void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
+  void SetMemtable(MemTable* new_mem) {
+    uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1;
+    new_mem->SetID(memtable_id);
+    mem_ = new_mem;
+  }
 
   // calculate the oldest log needed for the durability of this column family
   uint64_t OldestLogToKeep();
@@ -419,6 +423,9 @@ class ColumnFamilyData {
 
   // if the database was opened with 2pc enabled
   bool allow_2pc_;
+
+  // Memtable id to track flush.
+  std::atomic<uint64_t> last_memtable_id_;
 };
 
 // ColumnFamilySet has interesting thread-safety requirements
index 107e82467cba9bec0d3cd2e95f011d3e7ecaaf78..83895ea6cab15dc3cfaa8af1dc44b59d44e1a3b7 100644 (file)
@@ -126,6 +126,41 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
   ASSERT_EQ(1, num_compactions);
 }
 
+TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
+  Options options = CurrentOptions();
+  options.write_buffer_size = 100;
+  options.max_write_buffer_number = 4;
+  options.min_write_buffer_number_to_merge = 3;
+  Reopen(options);
+
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BGWorkFlush",
+        "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
+       {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
+        "DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(Put("key1", "value1"));
+
+  port::Thread t([&]() {
+    // The call wait for flush to finish, i.e. with flush_options.wait = true.
+    ASSERT_OK(Flush());
+  });
+
+  // Wait for flush start.
+  TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
+  // Insert a second memtable before the manual flush finish.
+  // At the end of the manual flush job, it will check if further flush
+  // is needed, but it will not trigger flush of the second memtable because
+  // min_write_buffer_number_to_merge is not reached.
+  ASSERT_OK(Put("key2", "value2"));
+  ASSERT_OK(dbfull()->TEST_SwitchMemtable());
+  TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
+
+  // Manual flush should return, without waiting for flush indefinitely.
+  t.join();
+}
+
 TEST_P(DBFlushDirectIOTest, DirectIO) {
   Options options;
   options.create_if_missing = true;
index 3db65048b85926af09d78c4366bf1135a7999b9d..a9eaad7285dfc464c1846115f688fecc308019b4 100644 (file)
@@ -810,8 +810,12 @@ class DBImpl : public DB {
   Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
                        bool writes_stopped = false);
 
-  // Wait for memtable flushed
-  Status WaitForFlushMemTable(ColumnFamilyData* cfd);
+  // Wait for memtable flushed.
+  // If flush_memtable_id is non-null, wait until the memtable with the ID
+  // gets flush. Otherwise, wait until the column family don't have any
+  // memtable pending flush.
+  Status WaitForFlushMemTable(ColumnFamilyData* cfd,
+                              const uint64_t* flush_memtable_id = nullptr);
 
   // REQUIRES: mutex locked
   Status SwitchWAL(WriteContext* write_context);
index 02f35df21cb84d631cbdd128ea6121288e1f53aa..1985c9fe32bf1f5a393637d3bb3c0c64220e2088 100644 (file)
@@ -134,6 +134,7 @@ Status DBImpl::FlushMemTableToOutputFile(
   }
 
   if (s.ok()) {
+    TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforeInstallSV");
     InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context,
                                        mutable_cf_options);
     if (made_progress) {
@@ -809,7 +810,13 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
 Status DBImpl::Flush(const FlushOptions& flush_options,
                      ColumnFamilyHandle* column_family) {
   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
-  return FlushMemTable(cfh->cfd(), flush_options);
+  ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
+                 cfh->GetName().c_str());
+  Status s = FlushMemTable(cfh->cfd(), flush_options);
+  ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                 "[%s] Manual flush finished, status: %s\n",
+                 cfh->GetName().c_str(), s.ToString().c_str());
+  return s;
 }
 
 Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
@@ -944,6 +951,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                              const FlushOptions& flush_options,
                              bool writes_stopped) {
   Status s;
+  uint64_t flush_memtable_id = 0;
   {
     WriteContext context;
     InstrumentedMutexLock guard_lock(&mutex_);
@@ -961,6 +969,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
 
     // SwitchMemtable() will release and reacquire mutex during execution
     s = SwitchMemtable(cfd, &context);
+    flush_memtable_id = cfd->imm()->GetLatestMemTableID();
 
     if (!writes_stopped) {
       write_thread_.ExitUnbatched(&w);
@@ -975,16 +984,19 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
 
   if (s.ok() && flush_options.wait) {
     // Wait until the compaction completes
-    s = WaitForFlushMemTable(cfd);
+    s = WaitForFlushMemTable(cfd, &flush_memtable_id);
   }
   return s;
 }
 
-Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
+Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
+                                    const uint64_t* flush_memtable_id) {
   Status s;
   // Wait until the compaction completes
   InstrumentedMutexLock l(&mutex_);
-  while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
+  while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() &&
+         (flush_memtable_id == nullptr ||
+          cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) {
     if (shutting_down_.load(std::memory_order_acquire)) {
       return Status::ShutdownInProgress();
     }
index 76e3cf1bf3b3959617ee4c7b4a9bda9f14075c64..bfafbeaccfc15a34de289e32b799de5953756677 100644 (file)
@@ -368,6 +368,11 @@ class MemTable {
     return oldest_key_time_.load(std::memory_order_relaxed);
   }
 
+  // REQUIRES: db_mutex held.
+  void SetID(uint64_t id) { id_ = id; }
+
+  uint64_t GetID() const { return id_; }
+
  private:
   enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
 
@@ -437,6 +442,9 @@ class MemTable {
   // Timestamp of oldest key
   std::atomic<uint64_t> oldest_key_time_;
 
+  // Memtable id to track flush.
+  uint64_t id_ = 0;
+
   // Returns a heuristic flush decision
   bool ShouldFlushNow() const;
 
index 1e46642a9d8a5b6aabcfb2281029de7a02cf5f5d..c2ac65a2fc5c3b7c3acc0c2c9eb0d1e1a3aabd6f 100644 (file)
@@ -5,11 +5,12 @@
 //
 #pragma once
 
-#include <string>
+#include <deque>
+#include <limits>
 #include <list>
-#include <vector>
 #include <set>
-#include <deque>
+#include <string>
+#include <vector>
 
 #include "db/dbformat.h"
 #include "db/memtable.h"
@@ -244,6 +245,22 @@ class MemTableList {
 
   uint64_t GetMinLogContainingPrepSection();
 
+  uint64_t GetEarliestMemTableID() const {
+    auto& memlist = current_->memlist_;
+    if (memlist.empty()) {
+      return std::numeric_limits<uint64_t>::max();
+    }
+    return memlist.back()->GetID();
+  }
+
+  uint64_t GetLatestMemTableID() const {
+    auto& memlist = current_->memlist_;
+    if (memlist.empty()) {
+      return 0;
+    }
+    return memlist.front()->GetID();
+  }
+
  private:
   // DB mutex held
   void InstallNewVersion();