]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix for 2PC causing WAL to grow too large
authorReid Horuff <horuff@fb.com>
Thu, 19 Jan 2017 23:21:07 +0000 (15:21 -0800)
committerYi Wu <yiwu@fb.com>
Fri, 20 Jan 2017 18:46:47 +0000 (10:46 -0800)
Summary:
Consider the following single column family scenario:
prepare in log A
commit in log B
*WAL is too large, flush all CFs to releast log A*
*CFA is on log B so we do not see CFA is depending on log A so no flush is requested*

To fix this we must also consider the log containing the prepare section when determining what log a CF is dependent on.
Closes https://github.com/facebook/rocksdb/pull/1768

Differential Revision: D4403265

Pulled By: reidHoruff

fbshipit-source-id: ce800ff

db/column_family.cc
db/column_family.h
db/db_impl.cc
db/db_impl.h
db/db_impl_debug.cc
db/memtable_list.h
utilities/transactions/transaction_test.cc

index 7dfd0b37ba9113d6b5b518b9d6d4b96bfd499b8d..797018077621983c8ac495da51e939528ef5dba1 100644 (file)
@@ -370,7 +370,8 @@ ColumnFamilyData::ColumnFamilyData(
       column_family_set_(column_family_set),
       pending_flush_(false),
       pending_compaction_(false),
-      prev_compaction_needed_bytes_(0) {
+      prev_compaction_needed_bytes_(0),
+      allow_2pc_(db_options.allow_2pc) {
   Ref();
 
   // Convert user defined table properties collector factories to internal ones.
@@ -492,6 +493,25 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
   return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
 }
 
+uint64_t ColumnFamilyData::OldestLogToKeep() {
+  auto current_log = GetLogNumber();
+
+  if (allow_2pc_) {
+    auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
+    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
+
+    if (imm_prep_log > 0 && imm_prep_log < current_log) {
+      current_log = imm_prep_log;
+    }
+
+    if (mem_prep_log > 0 && mem_prep_log < current_log) {
+      current_log = mem_prep_log;
+    }
+  }
+
+  return current_log;
+}
+
 const double kIncSlowdownRatio = 0.8;
 const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
 const double kNearStopSlowdownRatio = 0.6;
index 29d297157ac3ca1a27f3c42d3b2f23b763b72f77..2bf579a4a870673c52bc818ae6601401d3a77cb9 100644 (file)
@@ -239,6 +239,9 @@ class ColumnFamilyData {
   uint64_t GetTotalSstFilesSize() const;  // REQUIRE: DB mutex held
   void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
 
+  // calculate the oldest log needed for the durability of this column family
+  uint64_t OldestLogToKeep();
+
   // See Memtable constructor for explanation of earliest_seq param.
   MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
                                  SequenceNumber earliest_seq);
@@ -404,6 +407,9 @@ class ColumnFamilyData {
   bool pending_compaction_;
 
   uint64_t prev_compaction_needed_bytes_;
+
+  // if the database was opened with 2pc enabled
+  bool allow_2pc_;
 };
 
 // ColumnFamilySet has interesting thread-safety requirements
index a084d29d0cf5a47b73e16773cc19a1c9239474ee..f884701cdaf5277e23a9581b67410b9b7ab0a794 100644 (file)
@@ -342,6 +342,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
       last_stats_dump_time_microsec_(0),
       next_job_id_(1),
       has_unpersisted_data_(false),
+      unable_to_flush_oldest_log_(false),
       env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
       num_running_ingest_file_(0),
 #ifndef ROCKSDB_LITE
@@ -663,6 +664,10 @@ void DBImpl::MaybeDumpStats() {
 }
 
 uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
+  if (!allow_2pc()) {
+    return 0;
+  }
+
   uint64_t min_log = 0;
 
   // we must look through the memtables for two phase transactions
@@ -707,6 +712,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
 }
 
 uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
+
+  if (!allow_2pc()) {
+    return 0;
+  }
+
   std::lock_guard<std::mutex> lock(prep_heap_mutex_);
   uint64_t min_log = 0;
 
@@ -2505,7 +2515,7 @@ Status DBImpl::SetDBOptions(
       mutable_db_options_ = new_options;
 
       if (total_log_size_ > GetMaxTotalWalSize()) {
-        FlushColumnFamilies();
+        MaybeFlushColumnFamilies();
       }
 
       persist_options_status = PersistOptions();
@@ -4698,9 +4708,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
          versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
 
   if (UNLIKELY(!single_column_family_mode_ &&
-               !alive_log_files_.begin()->getting_flushed &&
                total_log_size_ > GetMaxTotalWalSize())) {
-    FlushColumnFamilies();
+    MaybeFlushColumnFamilies();
   } else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
     // Before a new memtable is added in SwitchMemtable(),
     // write_buffer_manager_->ShouldFlush() will keep returning true. If another
@@ -5018,28 +5027,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
   return status;
 }
 
-void DBImpl::FlushColumnFamilies() {
+void DBImpl::MaybeFlushColumnFamilies() {
   mutex_.AssertHeld();
 
-  WriteContext context;
-
   if (alive_log_files_.begin()->getting_flushed) {
     return;
   }
 
-  uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
-  alive_log_files_.begin()->getting_flushed = true;
+  auto oldest_alive_log = alive_log_files_.begin()->number;
+  auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
+
+  if (allow_2pc() &&
+      unable_to_flush_oldest_log_ &&
+      oldest_log_with_uncommited_prep > 0 &&
+      oldest_log_with_uncommited_prep <= oldest_alive_log) {
+    // we already attempted to flush all column families dependent on
+    // the oldest alive log but the log still contained uncommited transactions.
+    // the oldest alive log STILL contains uncommited transaction so there
+    // is still nothing that we can do.
+    return;
+  }
+
+  WriteContext context;
+
   Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
       "Flushing all column families with data in WAL number %" PRIu64
       ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
-      flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize());
+      oldest_alive_log, total_log_size_, GetMaxTotalWalSize());
   // no need to refcount because drop is happening in write thread, so can't
   // happen while we're in the write thread
   for (auto cfd : *versions_->GetColumnFamilySet()) {
     if (cfd->IsDropped()) {
       continue;
     }
-    if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
+    if (cfd->OldestLogToKeep() <= oldest_alive_log) {
       auto status = SwitchMemtable(cfd, &context);
       if (!status.ok()) {
         break;
@@ -5049,6 +5070,26 @@ void DBImpl::FlushColumnFamilies() {
     }
   }
   MaybeScheduleFlushOrCompaction();
+
+  // we only mark this log as getting flushed if we have successfully
+  // flushed all data in this log. If this log contains outstanding prepred
+  // transactions then we cannot flush this log until those transactions are commited.
+
+  unable_to_flush_oldest_log_ = false;
+
+  if (allow_2pc()) {
+    if (oldest_log_with_uncommited_prep == 0 ||
+        oldest_log_with_uncommited_prep > oldest_alive_log) {
+      // this log contains no outstanding prepared transactions
+      alive_log_files_.begin()->getting_flushed = true;
+    } else {
+      Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log,
+          "Unable to release oldest log due to uncommited transaction");
+      unable_to_flush_oldest_log_ = true;
+    }
+  } else {
+    alive_log_files_.begin()->getting_flushed = true;
+  }
 }
 
 uint64_t DBImpl::GetMaxTotalWalSize() const {
index dfd5f14825e360afb69e2f3090ec6b774578b52d..ddc6799d679b12f473f903a7e8fa4a9a58f7c17f 100644 (file)
@@ -308,6 +308,16 @@ class DBImpl : public DB {
                            ColumnFamilyHandle* column_family = nullptr,
                            bool disallow_trivial_move = false);
 
+  void TEST_MaybeFlushColumnFamilies();
+
+  bool TEST_UnableToFlushOldestLog() {
+    return unable_to_flush_oldest_log_;
+  }
+
+  bool TEST_IsLogGettingFlushed() {
+    return alive_log_files_.begin()->getting_flushed;
+  }
+
   // Force current memtable contents to be flushed.
   Status TEST_FlushMemTable(bool wait = true,
                             ColumnFamilyHandle* cfh = nullptr);
@@ -734,7 +744,7 @@ class DBImpl : public DB {
   // REQUIRES: mutex locked
   Status PersistOptions();
 
-  void FlushColumnFamilies();
+  void MaybeFlushColumnFamilies();
 
   uint64_t GetMaxTotalWalSize() const;
 
@@ -994,6 +1004,15 @@ class DBImpl : public DB {
   // Used when disableWAL is true.
   bool has_unpersisted_data_;
 
+
+  // if an attempt was made to flush all column families that
+  // the oldest log depends on but uncommited data in the oldest
+  // log prevents the log from being released.
+  // We must attempt to free the dependent memtables again
+  // at a later time after the transaction in the oldest
+  // log is fully commited.
+  bool unable_to_flush_oldest_log_;
+
   static const int KEEP_LOG_FILE_NUM = 1000;
   // MSVC version 1800 still does not have constexpr for ::max()
   static const uint64_t kNoTimeOut = port::kMaxUint64;
index dbf2391bdaa291cf8dc6b4235d655784fcd42e33..a69d2658d47060db3cf4403d044c1e852c4d37b0 100644 (file)
@@ -19,6 +19,11 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
   return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
 }
 
+void DBImpl::TEST_MaybeFlushColumnFamilies() {
+  InstrumentedMutexLock l(&mutex_);
+  MaybeFlushColumnFamilies();
+}
+
 int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
     ColumnFamilyHandle* column_family) {
   ColumnFamilyData* cfd;
index 67ef95bd3d2487bc99ca3364d28c6d5ca23a3eb1..97438d7549e51fc491f6e9c123a263b9e352fd6c 100644 (file)
@@ -221,6 +221,8 @@ class MemTableList {
   // PickMemtablesToFlush() is called.
   void FlushRequested() { flush_requested_ = true; }
 
+  bool HasFlushRequested() { return flush_requested_; }
+
   // Copying allowed
   // MemTableList(const MemTableList&);
   // void operator=(const MemTableList&);
index 585a9c1e996743335c6518cecf9df7bf4825af34..39efa1ccc01e620a47502ef94e65622f1ff13684 100644 (file)
@@ -1355,6 +1355,108 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
   delete cfb;
 }
 
+TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
+  DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+  Status s;
+  ColumnFamilyHandle *cfa, *cfb;
+
+  ColumnFamilyOptions cf_options;
+  s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
+  ASSERT_OK(s);
+  s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
+  ASSERT_OK(s);
+
+  WriteOptions wopts;
+  wopts.disableWAL = false;
+  wopts.sync = true;
+
+  auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
+  auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
+
+  TransactionOptions topts1;
+  Transaction* txn1 = db->BeginTransaction(wopts, topts1);
+  s = txn1->SetName("xid1");
+  ASSERT_OK(s);
+  s = txn1->Put(cfa, "boys", "girls1");
+  ASSERT_OK(s);
+
+  Transaction* txn2 = db->BeginTransaction(wopts, topts1);
+  s = txn2->SetName("xid2");
+  ASSERT_OK(s);
+  s = txn2->Put(cfb, "up", "down1");
+  ASSERT_OK(s);
+
+  // prepre transaction in LOG A
+  s = txn1->Prepare();
+  ASSERT_OK(s);
+
+  // prepre transaction in LOG A
+  s = txn2->Prepare();
+  ASSERT_OK(s);
+
+  // regular put so that mem table can actually be flushed for log rolling
+  s = db->Put(wopts, "cats", "dogs1");
+  ASSERT_OK(s);
+
+  auto prepare_log_no = txn1->GetLogNumber();
+
+  // roll to LOG B
+  s = db_impl->TEST_FlushMemTable(true);
+  ASSERT_OK(s);
+
+  // now we pause background work so that
+  // imm()s are not flushed before we can check their status
+  s = db_impl->PauseBackgroundWork();
+  ASSERT_OK(s);
+
+  ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
+  ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
+  ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
+  ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
+            prepare_log_no);
+  ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
+
+  // commit in LOG B
+  s = txn1->Commit();
+  ASSERT_OK(s);
+
+  ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
+
+  ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
+
+  // request a flush for all column families such that the earliest
+  // alive log file can be killed
+  db_impl->TEST_MaybeFlushColumnFamilies();
+  // log cannot be flushed because txn2 has not been commited
+  ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
+  ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
+
+  // assert that cfa has a flush requested
+  ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
+
+  // cfb should not be flushed becuse it has no data from LOG A
+  ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
+
+  // cfb now has data from LOG A
+  s = txn2->Commit();
+  ASSERT_OK(s);
+
+  db_impl->TEST_MaybeFlushColumnFamilies();
+  ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
+
+  // we should see that cfb now has a flush requested
+  ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
+
+  // all data in LOG A resides in a memtable that has been
+  // requested for a flush
+  ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
+
+  delete txn1;
+  delete txn2;
+  delete cfa;
+  delete cfb;
+}
 /*
  * 1) use prepare to keep first log around to determine starting sequence
  * during recovery.