]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Memtable sampling for mempurge heuristic. (#8628)
authorBaptiste Lemaire <blemaire@fb.com>
Wed, 11 Aug 2021 01:07:48 +0000 (18:07 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 11 Aug 2021 01:09:03 +0000 (18:09 -0700)
Summary:
Changes the API of the MemPurge process: the `bool experimental_allow_mempurge` and `experimental_mempurge_policy` flags have been replaced by a `double experimental_mempurge_threshold` option.
This change of API reflects another major change introduced in this PR: the MemPurgeDecider() function now works by sampling the memtables being flushed to estimate the overall amount of useful payload (payload minus the garbage), and then compare this useful payload estimate with the `double experimental_mempurge_threshold` value.
Therefore, when the value of this flag is `0.0` (default value), mempurge is simply deactivated. On the other hand, a value of `DBL_MAX` would be equivalent to always going through a mempurge regardless of the garbage ratio estimate.
At the moment, a `double experimental_mempurge_threshold` value else than 0.0 or `DBL_MAX` is opnly supported`with the `SkipList` memtable representation.
Regarding the sampling, this PR includes the introduction of a `MemTable::UniqueRandomSample` function that collects (approximately) random entries from the memtable by using the new `SkipList::Iterator::RandomSeek()` under the hood, or by iterating through each memtable entry, depending on the target sample size and the total number of entries.
The unit tests have been readapted to support this new API.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8628

Reviewed By: pdillinger

Differential Revision: D30149315

Pulled By: bjlemaire

fbshipit-source-id: 1feef5390c95db6f4480ab4434716533d3947f27

20 files changed:
db/c.cc
db/db_flush_test.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl_compaction_flush.cc
db/flush_job.cc
db/flush_job.h
db/memtable.h
db/memtable_list.h
db_stress_tool/db_stress_common.h
db_stress_tool/db_stress_gflags.cc
db_stress_tool/db_stress_test_base.cc
include/rocksdb/memtablerep.h
include/rocksdb/options.h
memtable/inlineskiplist.h
memtable/skiplistrep.cc
options/db_options.cc
options/db_options.h
options/options_test.cc
tools/db_bench_tool.cc
tools/db_crashtest.py

diff --git a/db/c.cc b/db/c.cc
index 8048cb7c9e2bf3c415cf4413a0411ccf94f33510..a3883c9bf3ce4c0c621b740ff16c3b200b3055b9 100644 (file)
--- a/db/c.cc
+++ b/db/c.cc
@@ -3029,9 +3029,9 @@ unsigned char rocksdb_options_get_advise_random_on_open(
   return opt->rep.advise_random_on_open;
 }
 
-void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt,
-                                                     unsigned char v) {
-  opt->rep.experimental_allow_mempurge = v;
+void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt,
+                                                         double v) {
+  opt->rep.experimental_mempurge_threshold = v;
 }
 
 void rocksdb_options_set_access_hint_on_compaction_start(
index b3e435472ad3daec7a090294d44d00de29ee184c..824dc9e550e0f5ac544d4fc830d64f01fefe4f55 100644 (file)
@@ -8,6 +8,7 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include <atomic>
+#include <limits>
 
 #include "db/db_impl/db_impl.h"
 #include "db/db_test_util.h"
@@ -694,8 +695,8 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
   // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
-  options.experimental_allow_mempurge = true;
-  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+  options.experimental_mempurge_threshold =
+      1.0;  // std::numeric_limits<double>::max();
   ASSERT_OK(TryReopen(options));
   uint32_t mempurge_count = 0;
   uint32_t sst_count = 0;
@@ -842,8 +843,8 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
   // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
-  options.experimental_allow_mempurge = true;
-  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+  options.experimental_mempurge_threshold =
+      1.0;  // std::numeric_limits<double>::max();
   ASSERT_OK(TryReopen(options));
 
   uint32_t mempurge_count = 0;
@@ -1046,8 +1047,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
   // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
   options.write_buffer_size = 1 << 20;
   // Activate the MemPurge prototype.
-  options.experimental_allow_mempurge = true;
-  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+  options.experimental_mempurge_threshold =
+      1.0;  // std::numeric_limits<double>::max();
   ASSERT_OK(TryReopen(options));
 
   uint32_t mempurge_count = 0;
@@ -1122,8 +1123,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
   // Enforce size of a single MemTable to 128KB.
   options.write_buffer_size = 128 << 10;
   // Activate the MemPurge prototype.
-  options.experimental_allow_mempurge = true;
-  options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+  options.experimental_mempurge_threshold =
+      1.0;  // std::numeric_limits<double>::max();
   ASSERT_OK(TryReopen(options));
 
   const size_t KVSIZE = 10;
@@ -1239,7 +1240,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
     const uint32_t EXPECTED_SST_COUNT = 0;
 
     EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
-    if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) {
+    if (options.experimental_mempurge_threshold ==
+        std::numeric_limits<double>::max()) {
       EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
     }
 
index e3ca3f52d66f6ffbe9ad23cee67cb93638fa963e..e46092ba4d78aa3a9879587c0cb2ae8e9d6e548a 100644 (file)
@@ -558,7 +558,7 @@ Status DBImpl::CloseHelper() {
   // flushing (but need to implement something
   // else than imm()->IsFlushPending() because the output
   // memtables added to imm() dont trigger flushes).
-  if (immutable_db_options_.experimental_allow_mempurge) {
+  if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
     Status flush_ret;
     mutex_.Unlock();
     for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) {
index 6f5e222586edc60737fe31cbd1b52c6b7e4e1866..7ec42c1fd65aa840fd1bdb76895780cfef757f96 100644 (file)
@@ -2410,7 +2410,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
     // future changes. Therefore, we add the following if
     // statement - note that calling it twice (or more)
     // doesn't break anything.
-    if (immutable_db_options_.experimental_allow_mempurge) {
+    if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
       // If imm() contains silent memtables,
       // requesting a flush will mark the imm_needed as true.
       cfd->imm()->FlushRequested();
@@ -2556,7 +2556,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
 
     for (const auto& iter : flush_req) {
       ColumnFamilyData* cfd = iter.first;
-      if (immutable_db_options_.experimental_allow_mempurge) {
+      if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
         // If imm() contains silent memtables,
         // requesting a flush will mark the imm_needed as true.
         cfd->imm()->FlushRequested();
index 3a258a57e1d0458d94d99d9cdcd308d36aca3187..bc4824af6a73264f098328961382d4ed31e37a23 100644 (file)
@@ -195,7 +195,7 @@ void FlushJob::PickMemTable() {
   // If mempurge feature is activated, keep track of any potential
   // memtables coming from a previous mempurge operation.
   // Used for mempurge policy.
-  if (db_options_.experimental_allow_mempurge) {
+  if (db_options_.experimental_mempurge_threshold > 0.0) {
     contains_mempurge_outcome_ = false;
     for (MemTable* mt : mems_) {
       if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
@@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
     prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
   }
   Status mempurge_s = Status::NotFound("No MemPurge.");
-  if (db_options_.experimental_allow_mempurge &&
+  if ((db_options_.experimental_mempurge_threshold > 0.0) &&
       (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
       (!mems_.empty()) && MemPurgeDecider()) {
     mempurge_s = MemPurge();
@@ -580,8 +580,6 @@ Status FlushJob::MemPurge() {
         // This addition will not trigger another flush, because
         // we do not call SchedulePendingFlush().
         cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
-        new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 /
-                           mutable_cf_options_.write_buffer_size;
         new_mem->Ref();
         db_mutex_->Unlock();
       } else {
@@ -622,16 +620,129 @@ Status FlushJob::MemPurge() {
 }
 
 bool FlushJob::MemPurgeDecider() {
-  MemPurgePolicy policy = db_options_.experimental_mempurge_policy;
-  if (policy == MemPurgePolicy::kAlways) {
+  double threshold = db_options_.experimental_mempurge_threshold;
+  // Never trigger mempurge if threshold is not a strictly positive value.
+  if (!(threshold > 0.0)) {
+    return false;
+  }
+  if (threshold > (1.0 * mems_.size())) {
     return true;
-  } else if (policy == MemPurgePolicy::kAlternate) {
-    // Note: if at least one of the flushed memtables is
-    // an output of a previous mempurge process, then flush
-    // to storage.
-    return !(contains_mempurge_outcome_);
   }
-  return false;
+  // Payload and useful_payload (in bytes).
+  // The useful payload ratio of a given MemTable
+  // is estimated to be useful_payload/payload.
+  uint64_t payload = 0, useful_payload = 0;
+  // If estimated_useful_payload is > threshold,
+  // then flush to storage, else MemPurge.
+  double estimated_useful_payload = 0.0;
+  // Cochran formula for determining sample size.
+  // 95% confidence interval, 7% precision.
+  //    n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
+  double n0 = 196.0;
+  ReadOptions ro;
+  ro.total_order_seek = true;
+
+  // Iterate over each memtable of the set.
+  for (MemTable* mt : mems_) {
+    // If the memtable is the output of a previous mempurge,
+    // its approximate useful payload ratio is already calculated.
+    if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
+      // We make the assumption that this memtable is already
+      // free of garbage (garbage underestimation).
+      estimated_useful_payload += mt->ApproximateMemoryUsage();
+    } else {
+      // Else sample from the table.
+      uint64_t nentries = mt->num_entries();
+      // Corrected Cochran formula for small populations
+      // (converges to n0 for large populations).
+      uint64_t target_sample_size =
+          static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
+      std::unordered_set<const char*> sentries = {};
+      // Populate sample entries set.
+      mt->UniqueRandomSample(target_sample_size, &sentries);
+
+      // Estimate the garbage ratio by comparing if
+      // each sample corresponds to a valid entry.
+      for (const char* ss : sentries) {
+        ParsedInternalKey res;
+        Slice entry_slice = GetLengthPrefixedSlice(ss);
+        Status parse_s =
+            ParseInternalKey(entry_slice, &res, true /*log_err_key*/);
+        if (!parse_s.ok()) {
+          ROCKS_LOG_WARN(db_options_.info_log,
+                         "Memtable Decider: ParseInternalKey did not parse "
+                         "entry_slice %s"
+                         "successfully.",
+                         entry_slice.data());
+        }
+        LookupKey lkey(res.user_key, kMaxSequenceNumber);
+        std::string vget;
+        Status s;
+        MergeContext merge_context;
+        SequenceNumber max_covering_tombstone_seq = 0, sqno = 0;
+
+        // Pick the oldest existing snapshot that is more recent
+        // than the sequence number of the sampled entry.
+        SequenceNumber min_seqno_snapshot = kMaxSequenceNumber;
+        SnapshotImpl min_snapshot;
+        for (SequenceNumber seq_num : existing_snapshots_) {
+          if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
+            min_seqno_snapshot = seq_num;
+          }
+        }
+        min_snapshot.number_ = min_seqno_snapshot;
+        ro.snapshot =
+            min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
+
+        // Estimate if the sample entry is valid or not.
+        bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context,
+                            &max_covering_tombstone_seq, &sqno, ro);
+        if (!gres) {
+          ROCKS_LOG_WARN(
+              db_options_.info_log,
+              "Memtable Get returned false when Get(sampled entry). "
+              "Yet each sample entry should exist somewhere in the memtable, "
+              "unrelated to whether it has been deleted or not.");
+        }
+        payload += entry_slice.size();
+
+        // TODO(bjlemaire): evaluate typeMerge.
+        // This is where the sampled entry is estimated to be
+        // garbage or not. Note that this is a garbage *estimation*
+        // because we do not include certain items such as
+        // CompactionFitlers triggered at flush, or if the same delete
+        // has been inserted twice or more in the memtable.
+        if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) {
+          useful_payload += entry_slice.size();
+        } else if (((res.type == kTypeDeletion) ||
+                    (res.type == kTypeSingleDeletion)) &&
+                   s.IsNotFound() && gres) {
+          useful_payload += entry_slice.size();
+        }
+      }
+      if (payload > 0) {
+        // We used the estimated useful payload ratio
+        // to evaluate how much of the total memtable is useful bytes.
+        estimated_useful_payload +=
+            (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
+        ROCKS_LOG_INFO(
+            db_options_.info_log,
+            "Mempurge sampling - found garbage ratio from sampling: %f.\n",
+            (payload - useful_payload) * 1.0 / payload);
+      } else {
+        ROCKS_LOG_WARN(
+            db_options_.info_log,
+            "Mempurge kSampling policy: null payload measured, and collected "
+            "sample size is %zu\n.",
+            sentries.size());
+      }
+    }
+  }
+  // We convert the total number of useful paylaod bytes
+  // into the proportion of memtable necessary to store all these bytes.
+  // We compare this proportion with the threshold value.
+  return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
+         threshold;
 }
 
 Status FlushJob::WriteLevel0Table() {
@@ -843,7 +954,7 @@ Status FlushJob::WriteLevel0Table() {
 
   stats.num_output_files_blob = static_cast<int>(blobs.size());
 
-  if (db_options_.experimental_allow_mempurge && s.ok()) {
+  if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) {
     // The db_mutex is held at this point.
     for (MemTable* mt : mems_) {
       // Note: if m is not a previous mempurge output memtable,
index 694dd71d2cfd719237da776765977435bc98ef5e..9050657de83b87240782c55b4c4386387a9ee60d 100644 (file)
@@ -117,9 +117,9 @@ class FlushJob {
   // of development. At the moment it is only compatible with the Get, Put,
   // Delete operations as well as Iterators and CompactionFilters.
   // For this early version, "MemPurge" is called by setting the
-  // options.experimental_allow_mempurge flag as "true". When this is
+  // options.experimental_mempurge_threshold value as >0.0. When this is
   // the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
-  // first go through the MemPurge process. herefore, we strongly
+  // first go through the MemPurge process. Therefore, we strongly
   // recommend all users not to set this flag as true given that the MemPurge
   // process has not matured yet.
   Status MemPurge();
@@ -192,7 +192,7 @@ class FlushJob {
   const std::string full_history_ts_low_;
   BlobFileCompletionCallback* blob_callback_;
 
-  // Used when experimental_allow_mempurge set to true.
+  // Used when experimental_mempurge_threshold > 0.0.
   bool contains_mempurge_outcome_;
 };
 
index 93060941a9438ff51f52bcc34da8150c394645d3..e6580379ffbfee327ae8d30755ab520caadfaf08 100644 (file)
@@ -14,6 +14,7 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "db/dbformat.h"
@@ -145,6 +146,26 @@ class MemTable {
     return approximate_memory_usage_.load(std::memory_order_relaxed);
   }
 
+  // Returns a vector of unique random memtable entries of size 'sample_size'.
+  //
+  // Note: the entries are stored in the unordered_set as length-prefixed keys,
+  //       hence their representation in the set as "const char*".
+  // Note2: the size of the output set 'entries' is not enforced to be strictly
+  //        equal to 'target_sample_size'. Its final size might be slightly
+  //        greater or slightly less than 'target_sample_size'
+  //
+  // REQUIRES: external synchronization to prevent simultaneous
+  // operations on the same MemTable (unless this Memtable is immutable).
+  // REQUIRES: SkipList memtable representation. This function is not
+  // implemented for any other type of memtable representation (vectorrep,
+  // hashskiplist,...).
+  void UniqueRandomSample(const uint64_t& target_sample_size,
+                          std::unordered_set<const char*>* entries) {
+    // TODO(bjlemaire): at the moment, only supported by skiplistrep.
+    // Extend it to all other memtable representations.
+    table_->UniqueRandomSample(num_entries(), target_sample_size, entries);
+  }
+
   // This method heuristically determines if the memtable should continue to
   // host more data.
   bool ShouldScheduleFlush() const {
index 17a7aa87f11e53b04371ff9909aefeca26038c32..6dde850163c0a5a0cf8e6dfbf6576d8a01fea0f1 100644 (file)
@@ -390,11 +390,7 @@ class MemTableList {
   // not freed, but put into a vector for future deref and reclamation.
   void RemoveOldMemTables(uint64_t log_number,
                           autovector<MemTable*>* to_delete);
-  void AddMemPurgeOutputID(uint64_t mid) {
-    if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
-      mempurged_ids_.insert(mid);
-    }
-  }
+  void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); }
 
   void RemoveMemPurgeOutputID(uint64_t mid) {
     if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
index ec3aa212e3cfeaeda7f89aedcf616fb987cbde0c..5db089b161dd424489d129746fcf9b8090c6350f 100644 (file)
@@ -141,8 +141,7 @@ DECLARE_uint64(subcompactions);
 DECLARE_uint64(periodic_compaction_seconds);
 DECLARE_uint64(compaction_ttl);
 DECLARE_bool(allow_concurrent_memtable_write);
-DECLARE_bool(experimental_allow_mempurge);
-DECLARE_string(experimental_mempurge_policy);
+DECLARE_double(experimental_mempurge_threshold);
 DECLARE_bool(enable_write_thread_adaptive_yield);
 DECLARE_int32(reopen);
 DECLARE_double(bloom_bits);
@@ -341,18 +340,6 @@ inline enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
   return ret_compression_type;
 }
 
-inline enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
-    const char* mpolicy) {
-  assert(mpolicy);
-  if (!strcasecmp(mpolicy, "kAlways")) {
-    return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
-  } else if (!strcasecmp(mpolicy, "kAlternate")) {
-    return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
-  }
-  fprintf(stderr, "Cannot parse mempurge policy: '%s'\n", mpolicy);
-  return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
-}
-
 inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType(
     const char* ctype) {
   assert(ctype);
index f1589d78ca2a4d8c749d3c8bf727894de1a33d45..adb44084af8df3681772cc9fe005c32511a45d8f 100644 (file)
@@ -326,11 +326,9 @@ DEFINE_uint64(compaction_ttl, 1000,
 DEFINE_bool(allow_concurrent_memtable_write, false,
             "Allow multi-writers to update mem tables in parallel.");
 
-DEFINE_bool(experimental_allow_mempurge, false,
-            "Allow mempurge process to collect memtable garbage bytes.");
-
-DEFINE_string(experimental_mempurge_policy, "kAlternate",
-              "Set mempurge (MemTable Garbage Collection) policy.");
+DEFINE_double(experimental_mempurge_threshold, 0.0,
+              "Maximum estimated useful payload that triggers a "
+              "mempurge process to collect memtable garbage bytes.");
 
 DEFINE_bool(enable_write_thread_adaptive_yield, true,
             "Use a yielding spin loop for brief writer thread waits.");
index df425239879533a32bfcf4e7c9c3be205cf176cf..9d9320ddec6ffb3397e90b1e02f0eac0036e01d8 100644 (file)
@@ -2266,9 +2266,8 @@ void StressTest::Open() {
     options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
     options_.allow_concurrent_memtable_write =
         FLAGS_allow_concurrent_memtable_write;
-    options_.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
-    options_.experimental_mempurge_policy =
-        StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
+    options_.experimental_mempurge_threshold =
+        FLAGS_experimental_mempurge_threshold;
     options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
     options_.ttl = FLAGS_compaction_ttl;
     options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
index b8701135d3eca3455901c48c0e9207031b408bfb..6974831fdcf3a1b98d0121cc2da8a2e978478488 100644 (file)
 #include <rocksdb/slice.h>
 #include <stdint.h>
 #include <stdlib.h>
+
 #include <memory>
 #include <stdexcept>
+#include <unordered_set>
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -194,6 +196,17 @@ class MemTableRep {
     return 0;
   }
 
+  // Returns a vector of unique random memtable entries of approximate
+  // size 'target_sample_size' (this size is not strictly enforced).
+  virtual void UniqueRandomSample(const uint64_t& num_entries,
+                                  const uint64_t& target_sample_size,
+                                  std::unordered_set<const char*>* entries) {
+    (void)num_entries;
+    (void)target_sample_size;
+    (void)entries;
+    assert(false);
+  }
+
   // Report an approximation of how much memory has been used other than memory
   // that was allocated through the allocator.  Safe to call from any thread.
   virtual size_t ApproximateMemoryUsage() = 0;
@@ -230,6 +243,8 @@ class MemTableRep {
     virtual void SeekForPrev(const Slice& internal_key,
                              const char* memtable_key) = 0;
 
+    virtual void RandomSeek() {}
+
     // Position at the first entry in collection.
     // Final state of iterator is Valid() iff collection is not empty.
     virtual void SeekToFirst() = 0;
index d7ecb5b3dc1abfe8dddd7e124492e610ddcc460e..95a9ff46fa948e1bf8bfb9fdc7ab939c8234d900 100644 (file)
@@ -369,11 +369,6 @@ struct DbPath {
 
 extern const char* kHostnameForDbHostId;
 
-enum class MemPurgePolicy : char {
-  kAlternate = 0x00,
-  kAlways = 0x01,
-};
-
 enum class CompactionServiceJobStatus : char {
   kSuccess,
   kFailure,
@@ -787,14 +782,22 @@ struct DBOptions {
   // Default: true
   bool advise_random_on_open = true;
 
-  // If true, allows for memtable purge instead of flush to storage.
-  // (experimental).
-  bool experimental_allow_mempurge = false;
-  // If experimental_allow_mempurge is true, will dictate MemPurge
-  // policy.
-  // Default: kAlternate
-  // (experimental).
-  MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate;
+  // [experimental]
+  // Used to activate or deactive the Mempurge feature (memtable garbage
+  // collection). (deactivated by default). At every flush, the total useful
+  // payload (total entries minus garbage entries) is estimated as a ratio
+  // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then
+  // compared to this `threshold` value:
+  //     - if ratio<threshold: the flush is replaced by a mempurge operation
+  //     - else: a regular flush operation takes place.
+  // Threshold values:
+  //   0.0: mempurge deactivated (default).
+  //   1.0: recommended threshold value.
+  //   >1.0 : aggressive mempurge.
+  //   0 < threshold < 1.0: mempurge triggered only for very low useful payload
+  //   ratios.
+  // [experimental]
+  double experimental_mempurge_threshold = 0.0;
 
   // Amount of data to build up in memtables across all column
   // families before writing to disk.
index 1782288f08cc008af38d0185ae50730f4545abf2..028fde3a273203780657ef1552fd67176de6c10b 100644 (file)
@@ -177,6 +177,9 @@ class InlineSkipList {
     // Retreat to the last entry with a key <= target
     void SeekForPrev(const char* target);
 
+    // Advance to a random entry in the list.
+    void RandomSeek();
+
     // Position at the first entry in list.
     // Final state of iterator is Valid() iff list is not empty.
     void SeekToFirst();
@@ -252,6 +255,9 @@ class InlineSkipList {
   // Return head_ if list is empty.
   Node* FindLast() const;
 
+  // Returns a random entry.
+  Node* FindRandomEntry() const;
+
   // Traverses a single level of the list, setting *out_prev to the last
   // node before the key and *out_next to the first node after. Assumes
   // that the key is not present in the skip list. On entry, before should
@@ -412,6 +418,11 @@ inline void InlineSkipList<Comparator>::Iterator::SeekForPrev(
   }
 }
 
+template <class Comparator>
+inline void InlineSkipList<Comparator>::Iterator::RandomSeek() {
+  node_ = list_->FindRandomEntry();
+}
+
 template <class Comparator>
 inline void InlineSkipList<Comparator>::Iterator::SeekToFirst() {
   node_ = list_->head_->Next(0);
@@ -558,6 +569,48 @@ InlineSkipList<Comparator>::FindLast() const {
   }
 }
 
+template <class Comparator>
+typename InlineSkipList<Comparator>::Node*
+InlineSkipList<Comparator>::FindRandomEntry() const {
+  // TODO(bjlemaire): consider adding PREFETCH calls.
+  Node *x = head_, *scan_node = nullptr, *limit_node = nullptr;
+
+  // We start at the max level.
+  // FOr each level, we look at all the nodes at the level, and
+  // we randomly pick one of them. Then decrement the level
+  // and reiterate the process.
+  // eg: assume GetMaxHeight()=5, and there are #100 elements (nodes).
+  // level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15.
+  // We will consider all the nodes between #15 (inclusive) and #67
+  // (exclusive). #67 is called 'limit_node' here.
+  // level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose
+  // #51. #67 remains 'limit_node'.
+  // [...]
+  // level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57.
+  // Return Node #57.
+  std::vector<Node*> lvl_nodes;
+  Random* rnd = Random::GetTLSInstance();
+  int level = GetMaxHeight() - 1;
+
+  while (level >= 0) {
+    lvl_nodes.clear();
+    scan_node = x;
+    while (scan_node != limit_node) {
+      lvl_nodes.push_back(scan_node);
+      scan_node = scan_node->Next(level);
+    }
+    uint32_t rnd_idx = rnd->Next() % lvl_nodes.size();
+    x = lvl_nodes[rnd_idx];
+    if (rnd_idx + 1 < lvl_nodes.size()) {
+      limit_node = lvl_nodes[rnd_idx + 1];
+    }
+    level--;
+  }
+  // There is a special case where x could still be the head_
+  // (note that the head_ contains no key).
+  return x == head_ ? head_->Next(0) : x;
+}
+
 template <class Comparator>
 uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const {
   uint64_t count = 0;
index eec15626c01f5c7ada4d1d84c320b14f73601f5d..abe7144ab9d73c3a44eb3e24a5b9e21dc34be259 100644 (file)
@@ -3,6 +3,8 @@
 //  COPYING file in the root directory) and Apache 2.0 License
 //  (found in the LICENSE.Apache file in the root directory).
 //
+#include <random>
+
 #include "db/memtable.h"
 #include "memory/arena.h"
 #include "memtable/inlineskiplist.h"
@@ -95,6 +97,66 @@ public:
     return (end_count >= start_count) ? (end_count - start_count) : 0;
   }
 
+  void UniqueRandomSample(const uint64_t& num_entries,
+                          const uint64_t& target_sample_size,
+                          std::unordered_set<const char*>* entries) override {
+    entries->clear();
+    // Avoid divide-by-0.
+    assert(target_sample_size > 0);
+    assert(num_entries > 0);
+    // NOTE: the size of entries is not enforced to be exactly
+    // target_sample_size at the end of this function, it might be slightly
+    // greater or smaller.
+    SkipListRep::Iterator iter(&skip_list_);
+    // There are two methods to create the subset of samples (size m)
+    // from the table containing N elements:
+    // 1-Iterate linearly through the N memtable entries. For each entry i,
+    //   add it to the sample set with a probability
+    //   (target_sample_size - entries.size() ) / (N-i).
+    //
+    // 2-Pick m random elements without repetition.
+    // We pick Option 2 when m<sqrt(N) and
+    // Option 1 when m > sqrt(N).
+    if (target_sample_size >
+        static_cast<uint64_t>(std::sqrt(1.0 * num_entries))) {
+      Random* rnd = Random::GetTLSInstance();
+      iter.SeekToFirst();
+      uint64_t counter = 0, num_samples_left = target_sample_size;
+      for (; iter.Valid() && (num_samples_left > 0); iter.Next(), counter++) {
+        // Add entry to sample set with probability
+        // num_samples_left/(num_entries - counter).
+        if (rnd->Next() % (num_entries - counter) < num_samples_left) {
+          entries->insert(iter.key());
+          num_samples_left--;
+        }
+      }
+    } else {
+      // Option 2: pick m random elements with no duplicates.
+      // If Option 2 is picked, then target_sample_size<sqrt(N)
+      // Using a set spares the need to check for duplicates.
+      for (uint64_t i = 0; i < target_sample_size; i++) {
+        // We give it 5 attempts to find a non-duplicate
+        // With 5 attempts, the chances of returning `entries` set
+        // of size target_sample_size is:
+        // PROD_{i=1}^{target_sample_size-1} [1-(i/N)^5]
+        // which is monotonically increasing with N in the worse case
+        // of target_sample_size=sqrt(N), and is always >99.9% for N>4.
+        // At worst, for the final pick , when m=sqrt(N) there is
+        // a probability of p= 1/sqrt(N) chances to find a duplicate.
+        for (uint64_t j = 0; j < 5; j++) {
+          iter.RandomSeek();
+          // unordered_set::insert returns pair<iterator, bool>.
+          // The second element is true if an insert successfully happened.
+          // If element is already in the set, this bool will be false, and
+          // true otherwise.
+          if ((entries->insert(iter.key())).second) {
+            break;
+          }
+        }
+      }
+    }
+  }
+
   ~SkipListRep() override {}
 
   // Iteration over the contents of a skip list
@@ -143,6 +205,8 @@ public:
       }
     }
 
+    void RandomSeek() override { iter_.RandomSeek(); }
+
     // Position at the first entry in list.
     // Final state of iterator is Valid() iff list is not empty.
     void SeekToFirst() override { iter_.SeekToFirst(); }
index 743a0a9e1d09ec518f2d7a73c060479e4f4e7b84..3e2c8bf13e1cd5eab47740f1fe35e094e0bbc013 100644 (file)
@@ -48,11 +48,6 @@ static std::unordered_map<std::string, InfoLogLevel> info_log_level_string_map =
      {"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL},
      {"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}};
 
-static std::unordered_map<std::string, MemPurgePolicy>
-    experimental_mempurge_policy_string_map = {
-        {"kAlternate", MemPurgePolicy::kAlternate},
-        {"kAlways", MemPurgePolicy::kAlways}};
-
 static std::unordered_map<std::string, OptionTypeInfo>
     db_mutable_options_type_info = {
         {"allow_os_buffer",
@@ -197,14 +192,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
          {offsetof(struct ImmutableDBOptions, error_if_exists),
           OptionType::kBoolean, OptionVerificationType::kNormal,
           OptionTypeFlags::kNone}},
-        {"experimental_allow_mempurge",
-         {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge),
-          OptionType::kBoolean, OptionVerificationType::kNormal,
+        {"experimental_mempurge_threshold",
+         {offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold),
+          OptionType::kDouble, OptionVerificationType::kNormal,
           OptionTypeFlags::kNone}},
-        {"experimental_mempurge_policy",
-         OptionTypeInfo::Enum<MemPurgePolicy>(
-             offsetof(struct ImmutableDBOptions, experimental_mempurge_policy),
-             &experimental_mempurge_policy_string_map)},
         {"is_fd_close_on_exec",
          {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
           OptionType::kBoolean, OptionVerificationType::kNormal,
@@ -615,8 +606,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
       allow_fallocate(options.allow_fallocate),
       is_fd_close_on_exec(options.is_fd_close_on_exec),
       advise_random_on_open(options.advise_random_on_open),
-      experimental_allow_mempurge(options.experimental_allow_mempurge),
-      experimental_mempurge_policy(options.experimental_mempurge_policy),
+      experimental_mempurge_threshold(options.experimental_mempurge_threshold),
       db_write_buffer_size(options.db_write_buffer_size),
       write_buffer_manager(options.write_buffer_manager),
       access_hint_on_compaction_start(options.access_hint_on_compaction_start),
@@ -750,12 +740,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
                    is_fd_close_on_exec);
   ROCKS_LOG_HEADER(log, "                  Options.advise_random_on_open: %d",
                    advise_random_on_open);
-  ROCKS_LOG_HEADER(log,
-                   "                  Options.experimental_allow_mempurge: %d",
-                   experimental_allow_mempurge);
-  ROCKS_LOG_HEADER(log,
-                   "                  Options.experimental_mempurge_policy: %d",
-                   static_cast<int>(experimental_mempurge_policy));
+  ROCKS_LOG_HEADER(
+      log, "                  Options.experimental_mempurge_threshold: %f",
+      experimental_mempurge_threshold);
   ROCKS_LOG_HEADER(
       log, "                   Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
       db_write_buffer_size);
index 50ec521f04cd997ff42ec4f50f434e6868b67961..d2b0568026f164c03fa22824d52db298171a7cc6 100644 (file)
@@ -57,8 +57,7 @@ struct ImmutableDBOptions {
   bool allow_fallocate;
   bool is_fd_close_on_exec;
   bool advise_random_on_open;
-  bool experimental_allow_mempurge;
-  MemPurgePolicy experimental_mempurge_policy;
+  double experimental_mempurge_threshold;
   size_t db_write_buffer_size;
   std::shared_ptr<WriteBufferManager> write_buffer_manager;
   DBOptions::AccessHint access_hint_on_compaction_start;
index 3905a957761d7e1a9603abb5344c037e519c8108..73cbedd6b233876c8a8f7266dc72d697c112ec3c 100644 (file)
@@ -143,7 +143,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
       {"persist_stats_to_disk", "false"},
       {"stats_history_buffer_size", "69"},
       {"advise_random_on_open", "true"},
-      {"experimental_allow_mempurge", "false"},
+      {"experimental_mempurge_threshold", "0.0"},
       {"use_adaptive_mutex", "false"},
       {"new_table_reader_for_compaction_inputs", "true"},
       {"compaction_readahead_size", "100"},
@@ -302,7 +302,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
   ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
   ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
   ASSERT_EQ(new_db_opt.advise_random_on_open, true);
-  ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
+  ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
   ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
   ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
   ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
@@ -2047,7 +2047,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
       {"persist_stats_to_disk", "false"},
       {"stats_history_buffer_size", "69"},
       {"advise_random_on_open", "true"},
-      {"experimental_allow_mempurge", "false"},
+      {"experimental_mempurge_threshold", "0.0"},
       {"use_adaptive_mutex", "false"},
       {"new_table_reader_for_compaction_inputs", "true"},
       {"compaction_readahead_size", "100"},
@@ -2200,7 +2200,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
   ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
   ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
   ASSERT_EQ(new_db_opt.advise_random_on_open, true);
-  ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
+  ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
   ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
   ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
   ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
index 17be134cb712fdc34435e9e3286290981aee5ae9..c7c254735f9c29cc406ab7877dbc5533fd6da6e1 100644 (file)
@@ -1039,19 +1039,6 @@ static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
   return ROCKSDB_NAMESPACE::kSnappyCompression;  // default value
 }
 
-static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
-    const char* mpolicy) {
-  assert(mpolicy);
-  if (!strcasecmp(mpolicy, "kAlways")) {
-    return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
-  } else if (!strcasecmp(mpolicy, "kAlternate")) {
-    return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
-  }
-
-  fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy);
-  return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
-}
-
 static std::string ColumnFamilyName(size_t i) {
   if (i == 0) {
     return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
@@ -1186,11 +1173,9 @@ DEFINE_bool(
 DEFINE_bool(allow_concurrent_memtable_write, true,
             "Allow multi-writers to update mem tables in parallel.");
 
-DEFINE_bool(experimental_allow_mempurge, false,
-            "Allow memtable garbage collection.");
-
-DEFINE_string(experimental_mempurge_policy, "kAlternate",
-              "Specify memtable garbage collection policy.");
+DEFINE_double(experimental_mempurge_threshold, 0.0,
+              "Maximum useful payload ratio estimate that triggers a mempurge "
+              "(memtable garbage collection).");
 
 DEFINE_bool(inplace_update_support,
             ROCKSDB_NAMESPACE::Options().inplace_update_support,
@@ -4275,9 +4260,8 @@ class Benchmark {
     options.delayed_write_rate = FLAGS_delayed_write_rate;
     options.allow_concurrent_memtable_write =
         FLAGS_allow_concurrent_memtable_write;
-    options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
-    options.experimental_mempurge_policy =
-        StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
+    options.experimental_mempurge_threshold =
+        FLAGS_experimental_mempurge_threshold;
     options.inplace_update_support = FLAGS_inplace_update_support;
     options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
     options.enable_write_thread_adaptive_yield =
index 3215bec9fec98c32e88995f507acd2c2add809b4..a133f3529657a4f71e74f32bb67f88edbef7599c 100644 (file)
@@ -220,8 +220,7 @@ whitebox_default_params = {
 simple_default_params = {
     "allow_concurrent_memtable_write": lambda: random.randint(0, 1),
     "column_families": 1,
-    "experimental_allow_mempurge": lambda: random.randint(0, 1),
-    "experimental_mempurge_policy": lambda: random.choice(["kAlways", "kAlternate"]),
+    "experimental_mempurge_threshold": lambda: 10.0*random.random(),
     "max_background_compactions": 1,
     "max_bytes_for_level_base": 67108864,
     "memtablerep": "skip_list",