]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Improve MemPurge sampling (#8656)
authorBaptiste Lemaire <blemaire@fb.com>
Fri, 13 Aug 2021 21:34:43 +0000 (14:34 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 13 Aug 2021 21:35:41 +0000 (14:35 -0700)
Summary:
Previously, the `MemPurge` sampling function was assessing whether a random entry from a memtable was garbage or not by simply querying the given memtable (see https://github.com/facebook/rocksdb/issues/8628 for more details).
In this diff, I am updating the sampling function by querying not only the memtable the entry was drawn from, but also all subsequent memtables that have a greater memtable ID.
I also added the size of the value for KV entries in the payload/useful payload estimates (which was also one of the reasons why sampling was not as good as mempurging all the time in terms of L0 SST files reduction).
Once these changes were made, I was able to clean obsolete objects and functions from the `MemtableList` struct, and did a bit of cleanup everywhere.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8656

Reviewed By: pdillinger

Differential Revision: D30288583

Pulled By: bjlemaire

fbshipit-source-id: 7646a545ec56f4715949daa59ab5eee74540feb3

db/flush_job.cc
db/flush_job.h
db/memtable_list.cc
db/memtable_list.h
include/rocksdb/memtablerep.h
memtable/skiplistrep.cc

index bc4824af6a73264f098328961382d4ed31e37a23..43dc87d68b20807101855c6856e4a923ce57142b 100644 (file)
@@ -192,19 +192,6 @@ void FlushJob::PickMemTable() {
   // path 0 for level 0 file.
   meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
 
-  // If mempurge feature is activated, keep track of any potential
-  // memtables coming from a previous mempurge operation.
-  // Used for mempurge policy.
-  if (db_options_.experimental_mempurge_threshold > 0.0) {
-    contains_mempurge_outcome_ = false;
-    for (MemTable* mt : mems_) {
-      if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
-        contains_mempurge_outcome_ = true;
-        break;
-      }
-    }
-  }
-
   base_ = cfd_->current();
   base_->Ref();  // it is likely that we do not need this reference
 }
@@ -246,8 +233,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
       (!mems_.empty()) && MemPurgeDecider()) {
     mempurge_s = MemPurge();
     if (!mempurge_s.ok()) {
-      // Mempurge is typically aborted when the new_mem output memtable
-      // is filled at more than XX % capacity (currently: 60%).
+      // Mempurge is typically aborted when the output
+      // bytes cannot be contained onto a single output memtable.
       if (mempurge_s.IsAborted()) {
         ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
                        mempurge_s.ToString().c_str());
@@ -567,16 +554,9 @@ Status FlushJob::MemPurge() {
           !(new_mem->ShouldFlushNow())) {
         db_mutex_->Lock();
         uint64_t new_mem_id = mems_[0]->GetID();
-        // Copy lowest memtable ID
-        // House keeping work.
-        for (MemTable* mt : mems_) {
-          new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id;
-          // Note: if m is not a previous mempurge output memtable,
-          // nothing happens.
-          cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
-        }
+
         new_mem->SetID(new_mem_id);
-        cfd_->imm()->AddMemPurgeOutputID(new_mem_id);
+
         // This addition will not trigger another flush, because
         // we do not call SchedulePendingFlush().
         cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
@@ -631,7 +611,20 @@ bool FlushJob::MemPurgeDecider() {
   // Payload and useful_payload (in bytes).
   // The useful payload ratio of a given MemTable
   // is estimated to be useful_payload/payload.
-  uint64_t payload = 0, useful_payload = 0;
+  uint64_t payload = 0, useful_payload = 0, entry_size = 0;
+
+  // Local variables used repetitively inside the for-loop
+  // when iterating over the sampled entries.
+  Slice key_slice, value_slice;
+  ParsedInternalKey res;
+  SnapshotImpl min_snapshot;
+  std::string vget;
+  Status mget_s, parse_s;
+  MergeContext merge_context;
+  SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
+                 min_seqno_snapshot = 0;
+  bool get_res, can_be_useful_payload, not_in_next_mems;
+
   // If estimated_useful_payload is > threshold,
   // then flush to storage, else MemPurge.
   double estimated_useful_payload = 0.0;
@@ -643,106 +636,136 @@ bool FlushJob::MemPurgeDecider() {
   ro.total_order_seek = true;
 
   // Iterate over each memtable of the set.
-  for (MemTable* mt : mems_) {
-    // If the memtable is the output of a previous mempurge,
-    // its approximate useful payload ratio is already calculated.
-    if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
-      // We make the assumption that this memtable is already
-      // free of garbage (garbage underestimation).
-      estimated_useful_payload += mt->ApproximateMemoryUsage();
-    } else {
-      // Else sample from the table.
-      uint64_t nentries = mt->num_entries();
-      // Corrected Cochran formula for small populations
-      // (converges to n0 for large populations).
-      uint64_t target_sample_size =
-          static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
-      std::unordered_set<const char*> sentries = {};
-      // Populate sample entries set.
-      mt->UniqueRandomSample(target_sample_size, &sentries);
-
-      // Estimate the garbage ratio by comparing if
-      // each sample corresponds to a valid entry.
-      for (const char* ss : sentries) {
-        ParsedInternalKey res;
-        Slice entry_slice = GetLengthPrefixedSlice(ss);
-        Status parse_s =
-            ParseInternalKey(entry_slice, &res, true /*log_err_key*/);
-        if (!parse_s.ok()) {
-          ROCKS_LOG_WARN(db_options_.info_log,
-                         "Memtable Decider: ParseInternalKey did not parse "
-                         "entry_slice %s"
-                         "successfully.",
-                         entry_slice.data());
-        }
-        LookupKey lkey(res.user_key, kMaxSequenceNumber);
-        std::string vget;
-        Status s;
-        MergeContext merge_context;
-        SequenceNumber max_covering_tombstone_seq = 0, sqno = 0;
-
-        // Pick the oldest existing snapshot that is more recent
-        // than the sequence number of the sampled entry.
-        SequenceNumber min_seqno_snapshot = kMaxSequenceNumber;
-        SnapshotImpl min_snapshot;
-        for (SequenceNumber seq_num : existing_snapshots_) {
-          if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
-            min_seqno_snapshot = seq_num;
-          }
-        }
-        min_snapshot.number_ = min_seqno_snapshot;
-        ro.snapshot =
-            min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
-
-        // Estimate if the sample entry is valid or not.
-        bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context,
-                            &max_covering_tombstone_seq, &sqno, ro);
-        if (!gres) {
-          ROCKS_LOG_WARN(
-              db_options_.info_log,
-              "Memtable Get returned false when Get(sampled entry). "
-              "Yet each sample entry should exist somewhere in the memtable, "
-              "unrelated to whether it has been deleted or not.");
-        }
-        payload += entry_slice.size();
-
-        // TODO(bjlemaire): evaluate typeMerge.
-        // This is where the sampled entry is estimated to be
-        // garbage or not. Note that this is a garbage *estimation*
-        // because we do not include certain items such as
-        // CompactionFitlers triggered at flush, or if the same delete
-        // has been inserted twice or more in the memtable.
-        if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) {
-          useful_payload += entry_slice.size();
-        } else if (((res.type == kTypeDeletion) ||
-                    (res.type == kTypeSingleDeletion)) &&
-                   s.IsNotFound() && gres) {
-          useful_payload += entry_slice.size();
+  for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
+       mem_iter++) {
+    MemTable* mt = *mem_iter;
+
+    // Else sample from the table.
+    uint64_t nentries = mt->num_entries();
+    // Corrected Cochran formula for small populations
+    // (converges to n0 for large populations).
+    uint64_t target_sample_size =
+        static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
+    std::unordered_set<const char*> sentries = {};
+    // Populate sample entries set.
+    mt->UniqueRandomSample(target_sample_size, &sentries);
+
+    // Estimate the garbage ratio by comparing if
+    // each sample corresponds to a valid entry.
+    for (const char* ss : sentries) {
+      key_slice = GetLengthPrefixedSlice(ss);
+      parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
+      if (!parse_s.ok()) {
+        ROCKS_LOG_WARN(db_options_.info_log,
+                       "Memtable Decider: ParseInternalKey did not parse "
+                       "key_slice %s successfully.",
+                       key_slice.data());
+      }
+
+      // Size of the entry is "key size (+ value size if KV entry)"
+      entry_size = key_slice.size();
+      if (res.type == kTypeValue) {
+        value_slice =
+            GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
+        entry_size += value_slice.size();
+      }
+
+      // Count entry bytes as payload.
+      payload += entry_size;
+
+      LookupKey lkey(res.user_key, kMaxSequenceNumber);
+
+      // Paranoia: zero out these values just in case.
+      max_covering_tombstone_seq = 0;
+      sqno = 0;
+
+      // Pick the oldest existing snapshot that is more recent
+      // than the sequence number of the sampled entry.
+      min_seqno_snapshot = kMaxSequenceNumber;
+      for (SequenceNumber seq_num : existing_snapshots_) {
+        if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
+          min_seqno_snapshot = seq_num;
         }
       }
-      if (payload > 0) {
-        // We used the estimated useful payload ratio
-        // to evaluate how much of the total memtable is useful bytes.
-        estimated_useful_payload +=
-            (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
-        ROCKS_LOG_INFO(
-            db_options_.info_log,
-            "Mempurge sampling - found garbage ratio from sampling: %f.\n",
-            (payload - useful_payload) * 1.0 / payload);
-      } else {
+      min_snapshot.number_ = min_seqno_snapshot;
+      ro.snapshot =
+          min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
+
+      // Estimate if the sample entry is valid or not.
+      get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
+                        &max_covering_tombstone_seq, &sqno, ro);
+      if (!get_res) {
         ROCKS_LOG_WARN(
             db_options_.info_log,
-            "Mempurge kSampling policy: null payload measured, and collected "
-            "sample size is %zu\n.",
-            sentries.size());
+            "Memtable Get returned false when Get(sampled entry). "
+            "Yet each sample entry should exist somewhere in the memtable, "
+            "unrelated to whether it has been deleted or not.");
+      }
+
+      // TODO(bjlemaire): evaluate typeMerge.
+      // This is where the sampled entry is estimated to be
+      // garbage or not. Note that this is a garbage *estimation*
+      // because we do not include certain items such as
+      // CompactionFitlers triggered at flush, or if the same delete
+      // has been inserted twice or more in the memtable.
+
+      // Evaluate if the entry can be useful payload
+      // Situation #1: entry is a KV entry, was found in the memtable mt
+      //               and the sequence numbers match.
+      can_be_useful_payload = (res.type == kTypeValue) && get_res &&
+                              mget_s.ok() && (sqno == res.sequence);
+
+      // Situation #2: entry is a delete entry, was found in the memtable mt
+      //               (because gres==true) and no valid KV entry is found.
+      //               (note: duplicate delete entries are also taken into
+      //               account here, because the sequence number 'sqno'
+      //               in memtable->Get(&sqno) operation is set to be equal
+      //               to the most recent delete entry as well).
+      can_be_useful_payload |=
+          ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
+          mget_s.IsNotFound() && get_res && (sqno == res.sequence);
+
+      // If there is a chance that the entry is useful payload
+      // Verify that the entry does not appear in the following memtables
+      // (memtables with greater memtable ID/larger sequence numbers).
+      if (can_be_useful_payload) {
+        not_in_next_mems = true;
+        for (auto next_mem_iter = mem_iter + 1;
+             next_mem_iter != std::end(mems_); next_mem_iter++) {
+          if ((*next_mem_iter)
+                  ->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
+                        &max_covering_tombstone_seq, &sqno, ro)) {
+            not_in_next_mems = false;
+            break;
+          }
+        }
+        if (not_in_next_mems) {
+          useful_payload += entry_size;
+        }
       }
     }
+    if (payload > 0) {
+      // We use the estimated useful payload ratio to
+      // evaluate how many of the memtable bytes are useful bytes.
+      estimated_useful_payload +=
+          (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
+
+      ROCKS_LOG_INFO(
+          db_options_.info_log,
+          "Mempurge sampling - found garbage ratio from sampling: %f.\n",
+          (payload - useful_payload) * 1.0 / payload);
+    } else {
+      ROCKS_LOG_WARN(db_options_.info_log,
+                     "Mempurge sampling: null payload measured, and collected "
+                     "sample size is %zu\n.",
+                     sentries.size());
+    }
   }
-  // We convert the total number of useful paylaod bytes
+  // We convert the total number of useful payload bytes
   // into the proportion of memtable necessary to store all these bytes.
   // We compare this proportion with the threshold value.
-  return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
-         threshold;
+  return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
+          threshold);
 }
 
 Status FlushJob::WriteLevel0Table() {
@@ -954,15 +977,6 @@ Status FlushJob::WriteLevel0Table() {
 
   stats.num_output_files_blob = static_cast<int>(blobs.size());
 
-  if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) {
-    // The db_mutex is held at this point.
-    for (MemTable* mt : mems_) {
-      // Note: if m is not a previous mempurge output memtable,
-      // nothing happens here.
-      cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
-    }
-  }
-
   RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
   cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
   cfd_->internal_stats()->AddCFStats(
index 9050657de83b87240782c55b4c4386387a9ee60d..81b4e86dd12a30147c7960130dabf61ba0c4ebd8 100644 (file)
@@ -191,9 +191,6 @@ class FlushJob {
 
   const std::string full_history_ts_low_;
   BlobFileCompletionCallback* blob_callback_;
-
-  // Used when experimental_mempurge_threshold > 0.0.
-  bool contains_mempurge_outcome_;
 };
 
 }  // namespace ROCKSDB_NAMESPACE
index 3927a3f034aeed590fcdee4b1c668af317c03586..e522d22076df46e6b7813442c99a10c48c45a731 100644 (file)
@@ -5,10 +5,12 @@
 //
 #include "db/memtable_list.h"
 
+#include <algorithm>
 #include <cinttypes>
 #include <limits>
 #include <queue>
 #include <string>
+
 #include "db/db_impl/db_impl.h"
 #include "db/memtable.h"
 #include "db/range_tombstone_fragmenter.h"
@@ -340,6 +342,14 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
       ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
   const auto& memlist = current_->memlist_;
   bool atomic_flush = false;
+
+  // Note: every time MemTableList::Add(mem) is called, it adds the new mem
+  // at the FRONT of the memlist (memlist.push_front(mem)). Therefore, by
+  // iterating through the memlist starting at the end, the vector<MemTable*>
+  // ret is filled with memtables already sorted in increasing MemTable ID.
+  // However, when the mempurge feature is activated, new memtables with older
+  // IDs will be added to the memlist. Therefore we std::sort(ret) at the end to
+  // return a vector of memtables sorted by increasing memtable ID.
   for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
     MemTable* m = *it;
     if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
@@ -361,6 +371,15 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
   if (!atomic_flush || num_flush_not_started_ == 0) {
     flush_requested_ = false;  // start-flush request is complete
   }
+
+  // Sort the list of memtables by increasing memtable ID.
+  // This is useful when the mempurge feature is activated
+  // and the memtables are not guaranteed to be sorted in
+  // the memlist vector.
+  std::sort(ret->begin(), ret->end(),
+            [](const MemTable* m1, const MemTable* m2) -> bool {
+              return m1->GetID() < m2->GetID();
+            });
 }
 
 void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
index 6dde850163c0a5a0cf8e6dfbf6576d8a01fea0f1..b73b7c2038f0a17d29f055cc153a706f22c3bd4a 100644 (file)
@@ -390,20 +390,6 @@ class MemTableList {
   // not freed, but put into a vector for future deref and reclamation.
   void RemoveOldMemTables(uint64_t log_number,
                           autovector<MemTable*>* to_delete);
-  void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); }
-
-  void RemoveMemPurgeOutputID(uint64_t mid) {
-    if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
-      mempurged_ids_.erase(mid);
-    }
-  }
-
-  bool IsMemPurgeOutput(uint64_t mid) {
-    if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
-      return false;
-    }
-    return true;
-  }
 
  private:
   friend Status InstallMemtableAtomicFlushResults(
@@ -450,10 +436,6 @@ class MemTableList {
 
   // Cached value of current_->HasHistory().
   std::atomic<bool> current_has_history_;
-
-  // Store the IDs of the memtables installed in this
-  // list that result from a mempurge operation.
-  std::unordered_set<uint64_t> mempurged_ids_;
 };
 
 // Installs memtable atomic flush results.
index 6974831fdcf3a1b98d0121cc2da8a2e978478488..934a0085a2ba7bb5258b2bd2c1d48ef5d4e49a3e 100644 (file)
@@ -198,8 +198,8 @@ class MemTableRep {
 
   // Returns a vector of unique random memtable entries of approximate
   // size 'target_sample_size' (this size is not strictly enforced).
-  virtual void UniqueRandomSample(const uint64_t& num_entries,
-                                  const uint64_t& target_sample_size,
+  virtual void UniqueRandomSample(const uint64_t num_entries,
+                                  const uint64_t target_sample_size,
                                   std::unordered_set<const char*>* entries) {
     (void)num_entries;
     (void)target_sample_size;
index abe7144ab9d73c3a44eb3e24a5b9e21dc34be259..d7f78672fc1b5f8d34e76d0722c0e093016610fe 100644 (file)
@@ -97,8 +97,8 @@ public:
     return (end_count >= start_count) ? (end_count - start_count) : 0;
   }
 
-  void UniqueRandomSample(const uint64_t& num_entries,
-                          const uint64_t& target_sample_size,
+  void UniqueRandomSample(const uint64_t num_entries,
+                          const uint64_t target_sample_size,
                           std::unordered_set<const char*>* entries) override {
     entries->clear();
     // Avoid divide-by-0.