]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Flush was hanging because the configured options specified that more than 1 memtable...
authorDhruba Borthakur <dhruba@fb.com>
Thu, 5 Sep 2013 00:24:35 +0000 (17:24 -0700)
committerDhruba Borthakur <dhruba@fb.com>
Fri, 6 Sep 2013 23:28:33 +0000 (16:28 -0700)
Summary:
There is an config option called Options.min_write_buffer_number_to_merge
that specifies the minimum number of write buffers to merge in memory
before flushing to a file in L0. But in the the case when the db is
being closed, we should not be using this config, instead we should
flush whatever write buffers were available at that time.

Test Plan: Unit test attached.

Reviewers: haobo, emayanke

Reviewed By: haobo

CC: leveldb
Differential Revision: https://reviews.facebook.net/D12717

db/db_impl.cc
db/db_test.cc
db/memtablelist.cc
db/memtablelist.h

index 66e44a61135988ab1ad6a15e84eb5bcdc9ed96a7..69e153bddba6e8e5bd8424daa3639f081d0ab969 100644 (file)
@@ -2745,6 +2745,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
       log_.reset(new log::Writer(std::move(lfile)));
       mem_->SetLogNumber(logfile_number_);
       imm_.Add(mem_);
+      if (force) {
+        imm_.FlushRequested();
+      }
       mem_ = new MemTable(internal_comparator_, mem_rep_factory_,
         NumberLevels(), options_);
       mem_->Ref();
index 3f2879027b4192f9343df593f4e0038e5aa360d8..5f11d72038293c3b13ba7904e21f04be26a95022 100644 (file)
@@ -1366,6 +1366,24 @@ TEST(DBTest, CheckLock) {
   } while (ChangeCompactOptions());
 }
 
+TEST(DBTest, FlushMultipleMemtable) {
+  do {
+    Options options = CurrentOptions();
+    WriteOptions writeOpt = WriteOptions();
+    writeOpt.disableWAL = true;
+    options.max_write_buffer_number = 4;
+    options.min_write_buffer_number_to_merge = 3;
+    Reopen(&options);
+    ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
+    dbfull()->Flush(FlushOptions());
+    ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));
+
+    ASSERT_EQ("v1", Get("foo"));
+    ASSERT_EQ("v1", Get("bar"));
+    dbfull()->Flush(FlushOptions());
+  } while (ChangeCompactOptions());
+}
+
 TEST(DBTest, FLUSH) {
   do {
     Options options = CurrentOptions();
index a1e6c9674f9c6ca4090700d935917eebcd76ae89..fa10a740e856249fab33ab31ca4e468edc6d9075 100644 (file)
@@ -42,7 +42,8 @@ int MemTableList::size() {
 // Returns true if there is at least one memtable on which flush has
 // not yet started.
 bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) {
-  if (num_flush_not_started_ >= min_write_buffer_number_to_merge) {
+  if ((flush_requested_ && num_flush_not_started_ >= 1) ||
+      (num_flush_not_started_ >= min_write_buffer_number_to_merge)) {
     assert(imm_flush_needed.NoBarrier_Load() != nullptr);
     return true;
   }
@@ -63,6 +64,7 @@ void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
       ret->push_back(m);
     }
   }
+  flush_requested_ = false; // start-flush request is complete
 }
 
 // Record a successful flush in the manifest file
index d97187b0de5202891f99b05d0e3130551382889e..327a2e0f77c35e45c86d4e93ad3421558bab2b5a 100644 (file)
@@ -29,7 +29,8 @@ class MemTableList {
  public:
   // A list of memtables.
   MemTableList() : size_(0), num_flush_not_started_(0),
-    commit_in_progress_(false) {
+    commit_in_progress_(false),
+    flush_requested_(false) {
     imm_flush_needed.Release_Store(nullptr);
   }
   ~MemTableList() {};
@@ -76,6 +77,9 @@ class MemTableList {
   // Returns the list of underlying memtables.
   void GetMemTables(std::vector<MemTable*>* list);
 
+  // Request a flush of all existing memtables to storage
+  void FlushRequested() { flush_requested_ = true; }
+
   // Copying allowed
   // MemTableList(const MemTableList&);
   // void operator=(const MemTableList&);
@@ -90,6 +94,9 @@ class MemTableList {
   // committing in progress
   bool commit_in_progress_;
 
+  // Requested a flush of all memtables to storage
+  bool flush_requested_;
+
 };
 
 }  // namespace leveldb