]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add simple heuristics for experimental mempurge. (#8583)
authorBaptiste Lemaire <blemaire@fb.com>
Mon, 26 Jul 2021 18:55:27 +0000 (11:55 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Mon, 26 Jul 2021 18:56:29 +0000 (11:56 -0700)
Summary:
Add `experimental_mempurge_policy` option flag and introduce two new `MemPurge` (Memtable Garbage Collection) policies: 'ALWAYS' and 'ALTERNATE'. Default value: ALTERNATE.
`ALWAYS`: every flush will first go through a `MemPurge` process. If the output is too big to fit into a single memtable, then the mempurge is aborted and a regular flush process carries on. `ALWAYS` is designed for user that need to reduce the number of L0 SST file created to a strict minimum, and can afford a small dent in performance (possibly hits to CPU usage, read efficiency, and maximum burst write throughput).
`ALTERNATE`: a flush is transformed into a `MemPurge` except if one of the memtables being flushed is the product of a previous `MemPurge`. `ALTERNATE` is a good tradeoff between reduction in number of L0 SST files created and performance. `ALTERNATE` perform particularly well for completely random garbage ratios, or garbage ratios anywhere in (0%,50%], and even higher when there is a wild variability in garbage ratios.
This PR also includes support for `experimental_mempurge_policy` in `db_bench`.
Testing was done locally by replacing all the `MemPurge` policies of the unit tests with `ALTERNATE`, as well as local testing with `db_crashtest.py` `whitebox` and `blackbox`. Overall, if an `ALWAYS` mempurge policy passes the tests, there is no reasons why an `ALTERNATE` policy would fail, and therefore the mempurge policy was set to `ALWAYS` for all mempurge unit tests.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8583

Reviewed By: pdillinger

Differential Revision: D29888050

Pulled By: bjlemaire

fbshipit-source-id: e2cf26646d66679f6f5fb29842624615610759c1

db/db_flush_test.cc
db/flush_job.cc
db/flush_job.h
db/memtable.h
db/memtable_list.h
include/rocksdb/options.h
options/db_options.cc
options/db_options.h
tools/db_bench_tool.cc

index ed1665b68a6b125fe9e687362ab016bf5d996935..b3e435472ad3daec7a090294d44d00de29ee184c 100644 (file)
@@ -695,6 +695,7 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
   options.experimental_allow_mempurge = true;
+  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
   ASSERT_OK(TryReopen(options));
   uint32_t mempurge_count = 0;
   uint32_t sst_count = 0;
@@ -808,7 +809,7 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
   // Assert that at least one flush to storage has been performed
   ASSERT_GT(sst_count, EXPECTED_SST_COUNT);
   // (which will consequently increase the number of mempurges recorded too).
-  ASSERT_EQ(mempurge_count, mempurge_count_record);
+  ASSERT_GE(mempurge_count, mempurge_count_record);
 
   // Assert that there is no data corruption, even with
   // a flush to storage.
@@ -842,6 +843,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
   options.experimental_allow_mempurge = true;
+  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
   ASSERT_OK(TryReopen(options));
 
   uint32_t mempurge_count = 0;
@@ -1045,6 +1047,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
   options.experimental_allow_mempurge = true;
+  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
   ASSERT_OK(TryReopen(options));
 
   uint32_t mempurge_count = 0;
@@ -1116,10 +1119,11 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
   options.inplace_update_support = false;
   options.allow_concurrent_memtable_write = true;
 
-  // Enforce size of a single MemTable to 1MB.
+  // Enforce size of a single MemTable to 128KB.
   options.write_buffer_size = 128 << 10;
   // Activate the MemPurge prototype.
   options.experimental_allow_mempurge = true;
+  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
   ASSERT_OK(TryReopen(options));
 
   const size_t KVSIZE = 10;
@@ -1158,7 +1162,7 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
     // more than would fit in maximum allowed memtables.
     Random rnd(719);
     const size_t NUM_REPEAT = 100;
-    const size_t RAND_KEY_LENGTH = 8192;
+    const size_t RAND_KEY_LENGTH = 4096;
     const size_t RAND_VALUES_LENGTH = 1024;
     std::vector<std::string> values_default(KVSIZE), values_pikachu(KVSIZE);
 
@@ -1235,7 +1239,9 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
     const uint32_t EXPECTED_SST_COUNT = 0;
 
     EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
-    EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
+    if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) {
+      EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
+    }
 
     ReopenWithColumnFamilies({"default", "pikachu"}, options);
     // Check that there was no data corruption anywhere,
index 09d348cc8c8ef1ec6404e8368eae530a7064a037..9ce20b2b497a9ecc6742acfb1bca7b7e2ddc966e 100644 (file)
@@ -192,6 +192,19 @@ void FlushJob::PickMemTable() {
   // path 0 for level 0 file.
   meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
 
+  // If mempurge feature is activated, keep track of any potential
+  // memtables coming from a previous mempurge operation.
+  // Used for mempurge policy.
+  if (db_options_.experimental_allow_mempurge) {
+    contains_mempurge_outcome_ = false;
+    for (MemTable* mt : mems_) {
+      if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
+        contains_mempurge_outcome_ = true;
+        break;
+      }
+    }
+  }
+
   base_ = cfd_->current();
   base_->Ref();  // it is likely that we do not need this reference
 }
@@ -230,7 +243,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
   Status mempurge_s = Status::NotFound("No MemPurge.");
   if (db_options_.experimental_allow_mempurge &&
       (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
-      (!mems_.empty())) {
+      (!mems_.empty()) && MemPurgeDecider()) {
     mempurge_s = MemPurge();
     if (!mempurge_s.ok()) {
       // Mempurge is typically aborted when the new_mem output memtable
@@ -339,7 +352,15 @@ Status FlushJob::MemPurge() {
   db_mutex_->Unlock();
   assert(!mems_.empty());
 
+  // Measure purging time.
+  const uint64_t start_micros = clock_->NowMicros();
+  const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000;
+
   MemTable* new_mem = nullptr;
+  // For performance/log investigation purposes:
+  // look at how much useful payload we harvest in the new_mem.
+  // This value is then printed to the DB log.
+  double new_mem_capacity = 0.0;
 
   // Create two iterators, one for the memtable data (contains
   // info from puts + deletes), and one for the memtable
@@ -392,8 +413,8 @@ Status FlushJob::MemPurge() {
   // or at least range tombstones, copy over the info
   // to the new memtable.
   if (iter->Valid() || !range_del_agg->IsEmpty()) {
-    // Arbitrary heuristic: maxSize is 60% cpacity.
-    size_t maxSize = ((mutable_cf_options_.write_buffer_size + 6U) / 10U);
+    // MaxSize is the size of a memtable.
+    size_t maxSize = mutable_cf_options_.write_buffer_size;
     std::unique_ptr<CompactionFilter> compaction_filter;
     if (ioptions->compaction_filter_factory != nullptr &&
         ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
@@ -480,6 +501,7 @@ Status FlushJob::MemPurge() {
       // and destroy new_mem.
       if (new_mem->ApproximateMemoryUsage() > maxSize) {
         s = Status::Aborted("Mempurge filled more than one memtable.");
+        new_mem_capacity = 1.0;
         break;
       }
     }
@@ -524,6 +546,7 @@ Status FlushJob::MemPurge() {
         // and destroy new_mem.
         if (new_mem->ApproximateMemoryUsage() > maxSize) {
           s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
+          new_mem_capacity = 1.0;
           break;
         }
       }
@@ -538,19 +561,35 @@ Status FlushJob::MemPurge() {
       new_mem->SetFirstSequenceNumber(new_first_seqno);
 
       // The new_mem is added to the list of immutable memtables
-      // only if it filled at less than 60% capacity (arbitrary heuristic).
-      if (new_mem->ApproximateMemoryUsage() < maxSize) {
+      // only if it filled at less than 100% capacity and isn't flagged
+      // as in need of being flushed.
+      if (new_mem->ApproximateMemoryUsage() < maxSize &&
+          !(new_mem->ShouldFlushNow())) {
         db_mutex_->Lock();
+        uint64_t new_mem_id = mems_[0]->GetID();
+        // Copy lowest memtable ID
+        // House keeping work.
+        for (MemTable* mt : mems_) {
+          new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id;
+          // Note: if m is not a previous mempurge output memtable,
+          // nothing happens.
+          cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
+        }
+        new_mem->SetID(new_mem_id);
+        cfd_->imm()->AddMemPurgeOutputID(new_mem_id);
         cfd_->imm()->Add(new_mem,
                          &job_context_->memtables_to_free,
                          false /* -> trigger_flush=false:
                                 * adding this memtable
                                 * will not trigger a flush.
                                 */);
+        new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 /
+                           mutable_cf_options_.write_buffer_size;
         new_mem->Ref();
         db_mutex_->Unlock();
       } else {
         s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
+        new_mem_capacity = 1.0;
         if (new_mem) {
           job_context_->memtables_to_free.push_back(new_mem);
         }
@@ -572,10 +611,32 @@ Status FlushJob::MemPurge() {
   } else {
     TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
   }
+  const uint64_t micros = clock_->NowMicros() - start_micros;
+  const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
+  ROCKS_LOG_INFO(db_options_.info_log,
+                 "[%s] [JOB %d] Mempurge lasted %" PRIu64
+                 " microseconds, and %" PRIu64
+                 " cpu "
+                 "microseconds. Status is %s ok. Perc capacity: %f\n",
+                 cfd_->GetName().c_str(), job_context_->job_id, micros,
+                 cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
 
   return s;
 }
 
+bool FlushJob::MemPurgeDecider() {
+  MemPurgePolicy policy = db_options_.experimental_mempurge_policy;
+  if (policy == MemPurgePolicy::kAlways) {
+    return true;
+  } else if (policy == MemPurgePolicy::kAlternate) {
+    // Note: if at least one of the flushed memtables is
+    // an output of a previous mempurge process, then flush
+    // to storage.
+    return !(contains_mempurge_outcome_);
+  }
+  return false;
+}
+
 Status FlushJob::WriteLevel0Table() {
   AutoThreadOperationStageUpdater stage_updater(
       ThreadStatus::STAGE_FLUSH_WRITE_L0);
@@ -762,8 +823,16 @@ Status FlushJob::WriteLevel0Table() {
 
   // Note that here we treat flush as level 0 compaction in internal stats
   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
-  stats.micros = clock_->NowMicros() - start_micros;
-  stats.cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
+  const uint64_t micros = clock_->NowMicros() - start_micros;
+  const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
+  stats.micros = micros;
+  stats.cpu_micros = cpu_micros;
+
+  ROCKS_LOG_INFO(db_options_.info_log,
+                 "[%s] [JOB %d] Flush lasted %" PRIu64
+                 " microseconds, and %" PRIu64 " cpu microseconds.\n",
+                 cfd_->GetName().c_str(), job_context_->job_id, micros,
+                 cpu_micros);
 
   if (has_output) {
     stats.bytes_written = meta_.fd.GetFileSize();
@@ -777,12 +846,22 @@ Status FlushJob::WriteLevel0Table() {
 
   stats.num_output_files_blob = static_cast<int>(blobs.size());
 
+  if (db_options_.experimental_allow_mempurge && s.ok()) {
+    // The db_mutex is held at this point.
+    for (MemTable* mt : mems_) {
+      // Note: if m is not a previous mempurge output memtable,
+      // nothing happens here.
+      cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
+    }
+  }
+
   RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
   cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
   cfd_->internal_stats()->AddCFStats(
       InternalStats::BYTES_FLUSHED,
       stats.bytes_written + stats.bytes_written_blob);
   RecordFlushIOStats();
+
   return s;
 }
 
index b0a6bf2de806d15ffe0d4553414b101c14ee7943..694dd71d2cfd719237da776765977435bc98ef5e 100644 (file)
@@ -123,6 +123,7 @@ class FlushJob {
   // recommend all users not to set this flag as true given that the MemPurge
   // process has not matured yet.
   Status MemPurge();
+  bool MemPurgeDecider();
 #ifndef ROCKSDB_LITE
   std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
 #endif  // !ROCKSDB_LITE
@@ -190,6 +191,9 @@ class FlushJob {
 
   const std::string full_history_ts_low_;
   BlobFileCompletionCallback* blob_callback_;
+
+  // Used when experimental_allow_mempurge set to true.
+  bool contains_mempurge_outcome_;
 };
 
 }  // namespace ROCKSDB_NAMESPACE
index 6f908ae5b597ab2972617b45927c4f80e60c9428..93060941a9438ff51f52bcc34da8150c394645d3 100644 (file)
@@ -471,6 +471,9 @@ class MemTable {
   }
 #endif  // !ROCKSDB_LITE
 
+  // Returns a heuristic flush decision
+  bool ShouldFlushNow();
+
  private:
   enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
 
@@ -558,9 +561,6 @@ class MemTable {
   std::unique_ptr<FlushJobInfo> flush_job_info_;
 #endif  // !ROCKSDB_LITE
 
-  // Returns a heuristic flush decision
-  bool ShouldFlushNow();
-
   // Updates flush_state_ using ShouldFlushNow()
   void UpdateFlushState();
 
index 22967c8c1a6675eda1d914c98e3f71e31bab060d..ada6469a1ea89d64fe64880b148d53f2ab2722c1 100644 (file)
@@ -389,6 +389,24 @@ class MemTableList {
   // not freed, but put into a vector for future deref and reclamation.
   void RemoveOldMemTables(uint64_t log_number,
                           autovector<MemTable*>* to_delete);
+  void AddMemPurgeOutputID(uint64_t mid) {
+    if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
+      mempurged_ids_.insert(mid);
+    }
+  }
+
+  void RemoveMemPurgeOutputID(uint64_t mid) {
+    if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
+      mempurged_ids_.erase(mid);
+    }
+  }
+
+  bool IsMemPurgeOutput(uint64_t mid) {
+    if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
+      return false;
+    }
+    return true;
+  }
 
  private:
   friend Status InstallMemtableAtomicFlushResults(
@@ -433,6 +451,10 @@ class MemTableList {
 
   // Cached value of current_->HasHistory().
   std::atomic<bool> current_has_history_;
+
+  // Store the IDs of the memtables installed in this
+  // list that result from a mempurge operation.
+  std::unordered_set<uint64_t> mempurged_ids_;
 };
 
 // Installs memtable atomic flush results.
index 220f90425abccf453550120123d45ec3b7db206e..d7ecb5b3dc1abfe8dddd7e124492e610ddcc460e 100644 (file)
@@ -369,6 +369,11 @@ struct DbPath {
 
 extern const char* kHostnameForDbHostId;
 
+enum class MemPurgePolicy : char {
+  kAlternate = 0x00,
+  kAlways = 0x01,
+};
+
 enum class CompactionServiceJobStatus : char {
   kSuccess,
   kFailure,
@@ -785,6 +790,11 @@ struct DBOptions {
   // If true, allows for memtable purge instead of flush to storage.
   // (experimental).
   bool experimental_allow_mempurge = false;
+  // If experimental_allow_mempurge is true, will dictate MemPurge
+  // policy.
+  // Default: kAlternate
+  // (experimental).
+  MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate;
 
   // Amount of data to build up in memtables across all column
   // families before writing to disk.
index 53533c2524bc6d27587d18ee799ff5a1ad3bb21c..9a364d57bb256c9b6c9fa3b0368032d263b768fd 100644 (file)
@@ -47,6 +47,11 @@ static std::unordered_map<std::string, InfoLogLevel> info_log_level_string_map =
      {"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL},
      {"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}};
 
+static std::unordered_map<std::string, MemPurgePolicy>
+    experimental_mempurge_policy_string_map = {
+        {"kAlternate", MemPurgePolicy::kAlternate},
+        {"kAlways", MemPurgePolicy::kAlways}};
+
 static std::unordered_map<std::string, OptionTypeInfo>
     db_mutable_options_type_info = {
         {"allow_os_buffer",
@@ -196,6 +201,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
          {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge),
           OptionType::kBoolean, OptionVerificationType::kNormal,
           OptionTypeFlags::kNone}},
+        {"experimental_mempurge_policy",
+         OptionTypeInfo::Enum<MemPurgePolicy>(
+             offsetof(struct ImmutableDBOptions, experimental_mempurge_policy),
+             &experimental_mempurge_policy_string_map)},
         {"is_fd_close_on_exec",
          {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
           OptionType::kBoolean, OptionVerificationType::kNormal,
@@ -546,6 +555,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
       is_fd_close_on_exec(options.is_fd_close_on_exec),
       advise_random_on_open(options.advise_random_on_open),
       experimental_allow_mempurge(options.experimental_allow_mempurge),
+      experimental_mempurge_policy(options.experimental_mempurge_policy),
       db_write_buffer_size(options.db_write_buffer_size),
       write_buffer_manager(options.write_buffer_manager),
       access_hint_on_compaction_start(options.access_hint_on_compaction_start),
@@ -682,6 +692,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
   ROCKS_LOG_HEADER(log,
                    "                  Options.experimental_allow_mempurge: %d",
                    experimental_allow_mempurge);
+  ROCKS_LOG_HEADER(log,
+                   "                  Options.experimental_mempurge_policy: %d",
+                   static_cast<int>(experimental_mempurge_policy));
   ROCKS_LOG_HEADER(
       log, "                   Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
       db_write_buffer_size);
index e609563d21f9d60a4d3e94fc752395d7b528d1fc..188e688c3ab4e9f6f1fd38147dd852771ec1c62b 100644 (file)
@@ -55,6 +55,7 @@ struct ImmutableDBOptions {
   bool is_fd_close_on_exec;
   bool advise_random_on_open;
   bool experimental_allow_mempurge;
+  MemPurgePolicy experimental_mempurge_policy;
   size_t db_write_buffer_size;
   std::shared_ptr<WriteBufferManager> write_buffer_manager;
   DBOptions::AccessHint access_hint_on_compaction_start;
index c918146638c9b6cd08d789257dd425ff7bc9b731..354453c231ed13e8f2abd8da1a72ccec5e959458 100644 (file)
@@ -1000,6 +1000,19 @@ static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
   return ROCKSDB_NAMESPACE::kSnappyCompression;  // default value
 }
 
+static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
+    const char* mpolicy) {
+  assert(mpolicy);
+  if (!strcasecmp(mpolicy, "kAlways")) {
+    return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
+  } else if (!strcasecmp(mpolicy, "kAlternate")) {
+    return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
+  }
+
+  fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy);
+  return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
+}
+
 static std::string ColumnFamilyName(size_t i) {
   if (i == 0) {
     return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
@@ -1137,6 +1150,9 @@ DEFINE_bool(allow_concurrent_memtable_write, true,
 DEFINE_bool(experimental_allow_mempurge, false,
             "Allow memtable garbage collection.");
 
+DEFINE_string(experimental_mempurge_policy, "kAlternate",
+              "Specify memtable garbage collection policy.");
+
 DEFINE_bool(inplace_update_support,
             ROCKSDB_NAMESPACE::Options().inplace_update_support,
             "Support in-place memtable update for smaller or same-size values");
@@ -4211,6 +4227,8 @@ class Benchmark {
     options.allow_concurrent_memtable_write =
         FLAGS_allow_concurrent_memtable_write;
     options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
+    options.experimental_mempurge_policy =
+        StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
     options.inplace_update_support = FLAGS_inplace_update_support;
     options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
     options.enable_write_thread_adaptive_yield =