return opt->rep.advise_random_on_open;
}
-void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt,
- unsigned char v) {
- opt->rep.experimental_allow_mempurge = v;
+void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt,
+ double v) {
+ opt->rep.experimental_mempurge_threshold = v;
}
void rocksdb_options_set_access_hint_on_compaction_start(
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic>
+#include <limits>
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
- options.experimental_allow_mempurge = true;
- options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+ options.experimental_mempurge_threshold =
+ 1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
- options.experimental_allow_mempurge = true;
- options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+ options.experimental_mempurge_threshold =
+ 1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
- options.experimental_allow_mempurge = true;
- options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+ options.experimental_mempurge_threshold =
+ 1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
// Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype.
- options.experimental_allow_mempurge = true;
- options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
+ options.experimental_mempurge_threshold =
+ 1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
const size_t KVSIZE = 10;
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
- if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) {
+ if (options.experimental_mempurge_threshold ==
+ std::numeric_limits<double>::max()) {
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
}
// flushing (but need to implement something
// else than imm()->IsFlushPending() because the output
// memtables added to imm() dont trigger flushes).
- if (immutable_db_options_.experimental_allow_mempurge) {
+ if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
Status flush_ret;
mutex_.Unlock();
for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) {
// future changes. Therefore, we add the following if
// statement - note that calling it twice (or more)
// doesn't break anything.
- if (immutable_db_options_.experimental_allow_mempurge) {
+ if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
// If imm() contains silent memtables,
// requesting a flush will mark the imm_needed as true.
cfd->imm()->FlushRequested();
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
- if (immutable_db_options_.experimental_allow_mempurge) {
+ if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
// If imm() contains silent memtables,
// requesting a flush will mark the imm_needed as true.
cfd->imm()->FlushRequested();
// If mempurge feature is activated, keep track of any potential
// memtables coming from a previous mempurge operation.
// Used for mempurge policy.
- if (db_options_.experimental_allow_mempurge) {
+ if (db_options_.experimental_mempurge_threshold > 0.0) {
contains_mempurge_outcome_ = false;
for (MemTable* mt : mems_) {
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
Status mempurge_s = Status::NotFound("No MemPurge.");
- if (db_options_.experimental_allow_mempurge &&
+ if ((db_options_.experimental_mempurge_threshold > 0.0) &&
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
(!mems_.empty()) && MemPurgeDecider()) {
mempurge_s = MemPurge();
// This addition will not trigger another flush, because
// we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
- new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 /
- mutable_cf_options_.write_buffer_size;
new_mem->Ref();
db_mutex_->Unlock();
} else {
}
bool FlushJob::MemPurgeDecider() {
- MemPurgePolicy policy = db_options_.experimental_mempurge_policy;
- if (policy == MemPurgePolicy::kAlways) {
+ double threshold = db_options_.experimental_mempurge_threshold;
+ // Never trigger mempurge if threshold is not a strictly positive value.
+ if (!(threshold > 0.0)) {
+ return false;
+ }
+ if (threshold > (1.0 * mems_.size())) {
return true;
- } else if (policy == MemPurgePolicy::kAlternate) {
- // Note: if at least one of the flushed memtables is
- // an output of a previous mempurge process, then flush
- // to storage.
- return !(contains_mempurge_outcome_);
}
- return false;
+ // Payload and useful_payload (in bytes).
+ // The useful payload ratio of a given MemTable
+ // is estimated to be useful_payload/payload.
+ uint64_t payload = 0, useful_payload = 0;
+ // If estimated_useful_payload is > threshold,
+ // then flush to storage, else MemPurge.
+ double estimated_useful_payload = 0.0;
+ // Cochran formula for determining sample size.
+ // 95% confidence interval, 7% precision.
+ // n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
+ double n0 = 196.0;
+ ReadOptions ro;
+ ro.total_order_seek = true;
+
+ // Iterate over each memtable of the set.
+ for (MemTable* mt : mems_) {
+ // If the memtable is the output of a previous mempurge,
+ // its approximate useful payload ratio is already calculated.
+ if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
+ // We make the assumption that this memtable is already
+ // free of garbage (garbage underestimation).
+ estimated_useful_payload += mt->ApproximateMemoryUsage();
+ } else {
+ // Else sample from the table.
+ uint64_t nentries = mt->num_entries();
+ // Corrected Cochran formula for small populations
+ // (converges to n0 for large populations).
+ uint64_t target_sample_size =
+ static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
+ std::unordered_set<const char*> sentries = {};
+ // Populate sample entries set.
+ mt->UniqueRandomSample(target_sample_size, &sentries);
+
+ // Estimate the garbage ratio by comparing if
+ // each sample corresponds to a valid entry.
+ for (const char* ss : sentries) {
+ ParsedInternalKey res;
+ Slice entry_slice = GetLengthPrefixedSlice(ss);
+ Status parse_s =
+ ParseInternalKey(entry_slice, &res, true /*log_err_key*/);
+ if (!parse_s.ok()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Memtable Decider: ParseInternalKey did not parse "
+ "entry_slice %s"
+ "successfully.",
+ entry_slice.data());
+ }
+ LookupKey lkey(res.user_key, kMaxSequenceNumber);
+ std::string vget;
+ Status s;
+ MergeContext merge_context;
+ SequenceNumber max_covering_tombstone_seq = 0, sqno = 0;
+
+ // Pick the oldest existing snapshot that is more recent
+ // than the sequence number of the sampled entry.
+ SequenceNumber min_seqno_snapshot = kMaxSequenceNumber;
+ SnapshotImpl min_snapshot;
+ for (SequenceNumber seq_num : existing_snapshots_) {
+ if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
+ min_seqno_snapshot = seq_num;
+ }
+ }
+ min_snapshot.number_ = min_seqno_snapshot;
+ ro.snapshot =
+ min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
+
+ // Estimate if the sample entry is valid or not.
+ bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context,
+ &max_covering_tombstone_seq, &sqno, ro);
+ if (!gres) {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Memtable Get returned false when Get(sampled entry). "
+ "Yet each sample entry should exist somewhere in the memtable, "
+ "unrelated to whether it has been deleted or not.");
+ }
+ payload += entry_slice.size();
+
+ // TODO(bjlemaire): evaluate typeMerge.
+ // This is where the sampled entry is estimated to be
+ // garbage or not. Note that this is a garbage *estimation*
+ // because we do not include certain items such as
+ // CompactionFitlers triggered at flush, or if the same delete
+ // has been inserted twice or more in the memtable.
+ if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) {
+ useful_payload += entry_slice.size();
+ } else if (((res.type == kTypeDeletion) ||
+ (res.type == kTypeSingleDeletion)) &&
+ s.IsNotFound() && gres) {
+ useful_payload += entry_slice.size();
+ }
+ }
+ if (payload > 0) {
+ // We used the estimated useful payload ratio
+ // to evaluate how much of the total memtable is useful bytes.
+ estimated_useful_payload +=
+ (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
+ ROCKS_LOG_INFO(
+ db_options_.info_log,
+ "Mempurge sampling - found garbage ratio from sampling: %f.\n",
+ (payload - useful_payload) * 1.0 / payload);
+ } else {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Mempurge kSampling policy: null payload measured, and collected "
+ "sample size is %zu\n.",
+ sentries.size());
+ }
+ }
+ }
+ // We convert the total number of useful paylaod bytes
+ // into the proportion of memtable necessary to store all these bytes.
+ // We compare this proportion with the threshold value.
+ return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
+ threshold;
}
Status FlushJob::WriteLevel0Table() {
stats.num_output_files_blob = static_cast<int>(blobs.size());
- if (db_options_.experimental_allow_mempurge && s.ok()) {
+ if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) {
// The db_mutex is held at this point.
for (MemTable* mt : mems_) {
// Note: if m is not a previous mempurge output memtable,
// of development. At the moment it is only compatible with the Get, Put,
// Delete operations as well as Iterators and CompactionFilters.
// For this early version, "MemPurge" is called by setting the
- // options.experimental_allow_mempurge flag as "true". When this is
+ // options.experimental_mempurge_threshold value as >0.0. When this is
// the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
- // first go through the MemPurge process. herefore, we strongly
+ // first go through the MemPurge process. Therefore, we strongly
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge();
const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;
- // Used when experimental_allow_mempurge set to true.
+ // Used when experimental_mempurge_threshold > 0.0.
bool contains_mempurge_outcome_;
};
#include <memory>
#include <string>
#include <unordered_map>
+#include <unordered_set>
#include <vector>
#include "db/dbformat.h"
return approximate_memory_usage_.load(std::memory_order_relaxed);
}
+ // Returns a vector of unique random memtable entries of size 'sample_size'.
+ //
+ // Note: the entries are stored in the unordered_set as length-prefixed keys,
+ // hence their representation in the set as "const char*".
+ // Note2: the size of the output set 'entries' is not enforced to be strictly
+ // equal to 'target_sample_size'. Its final size might be slightly
+ // greater or slightly less than 'target_sample_size'
+ //
+ // REQUIRES: external synchronization to prevent simultaneous
+ // operations on the same MemTable (unless this Memtable is immutable).
+ // REQUIRES: SkipList memtable representation. This function is not
+ // implemented for any other type of memtable representation (vectorrep,
+ // hashskiplist,...).
+ void UniqueRandomSample(const uint64_t& target_sample_size,
+ std::unordered_set<const char*>* entries) {
+ // TODO(bjlemaire): at the moment, only supported by skiplistrep.
+ // Extend it to all other memtable representations.
+ table_->UniqueRandomSample(num_entries(), target_sample_size, entries);
+ }
+
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
// not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
- void AddMemPurgeOutputID(uint64_t mid) {
- if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
- mempurged_ids_.insert(mid);
- }
- }
+ void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); }
void RemoveMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
DECLARE_uint64(periodic_compaction_seconds);
DECLARE_uint64(compaction_ttl);
DECLARE_bool(allow_concurrent_memtable_write);
-DECLARE_bool(experimental_allow_mempurge);
-DECLARE_string(experimental_mempurge_policy);
+DECLARE_double(experimental_mempurge_threshold);
DECLARE_bool(enable_write_thread_adaptive_yield);
DECLARE_int32(reopen);
DECLARE_double(bloom_bits);
return ret_compression_type;
}
-inline enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
- const char* mpolicy) {
- assert(mpolicy);
- if (!strcasecmp(mpolicy, "kAlways")) {
- return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
- } else if (!strcasecmp(mpolicy, "kAlternate")) {
- return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
- }
- fprintf(stderr, "Cannot parse mempurge policy: '%s'\n", mpolicy);
- return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
-}
-
inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType(
const char* ctype) {
assert(ctype);
DEFINE_bool(allow_concurrent_memtable_write, false,
"Allow multi-writers to update mem tables in parallel.");
-DEFINE_bool(experimental_allow_mempurge, false,
- "Allow mempurge process to collect memtable garbage bytes.");
-
-DEFINE_string(experimental_mempurge_policy, "kAlternate",
- "Set mempurge (MemTable Garbage Collection) policy.");
+DEFINE_double(experimental_mempurge_threshold, 0.0,
+ "Maximum estimated useful payload that triggers a "
+ "mempurge process to collect memtable garbage bytes.");
DEFINE_bool(enable_write_thread_adaptive_yield, true,
"Use a yielding spin loop for brief writer thread waits.");
options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options_.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
- options_.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
- options_.experimental_mempurge_policy =
- StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
+ options_.experimental_mempurge_threshold =
+ FLAGS_experimental_mempurge_threshold;
options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
options_.ttl = FLAGS_compaction_ttl;
options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
#include <rocksdb/slice.h>
#include <stdint.h>
#include <stdlib.h>
+
#include <memory>
#include <stdexcept>
+#include <unordered_set>
namespace ROCKSDB_NAMESPACE {
return 0;
}
+ // Returns a vector of unique random memtable entries of approximate
+ // size 'target_sample_size' (this size is not strictly enforced).
+ virtual void UniqueRandomSample(const uint64_t& num_entries,
+ const uint64_t& target_sample_size,
+ std::unordered_set<const char*>* entries) {
+ (void)num_entries;
+ (void)target_sample_size;
+ (void)entries;
+ assert(false);
+ }
+
// Report an approximation of how much memory has been used other than memory
// that was allocated through the allocator. Safe to call from any thread.
virtual size_t ApproximateMemoryUsage() = 0;
virtual void SeekForPrev(const Slice& internal_key,
const char* memtable_key) = 0;
+ virtual void RandomSeek() {}
+
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() = 0;
extern const char* kHostnameForDbHostId;
-enum class MemPurgePolicy : char {
- kAlternate = 0x00,
- kAlways = 0x01,
-};
-
enum class CompactionServiceJobStatus : char {
kSuccess,
kFailure,
// Default: true
bool advise_random_on_open = true;
- // If true, allows for memtable purge instead of flush to storage.
- // (experimental).
- bool experimental_allow_mempurge = false;
- // If experimental_allow_mempurge is true, will dictate MemPurge
- // policy.
- // Default: kAlternate
- // (experimental).
- MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate;
+ // [experimental]
+ // Used to activate or deactive the Mempurge feature (memtable garbage
+ // collection). (deactivated by default). At every flush, the total useful
+ // payload (total entries minus garbage entries) is estimated as a ratio
+ // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then
+ // compared to this `threshold` value:
+ // - if ratio<threshold: the flush is replaced by a mempurge operation
+ // - else: a regular flush operation takes place.
+ // Threshold values:
+ // 0.0: mempurge deactivated (default).
+ // 1.0: recommended threshold value.
+ // >1.0 : aggressive mempurge.
+ // 0 < threshold < 1.0: mempurge triggered only for very low useful payload
+ // ratios.
+ // [experimental]
+ double experimental_mempurge_threshold = 0.0;
// Amount of data to build up in memtables across all column
// families before writing to disk.
// Retreat to the last entry with a key <= target
void SeekForPrev(const char* target);
+ // Advance to a random entry in the list.
+ void RandomSeek();
+
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();
// Return head_ if list is empty.
Node* FindLast() const;
+ // Returns a random entry.
+ Node* FindRandomEntry() const;
+
// Traverses a single level of the list, setting *out_prev to the last
// node before the key and *out_next to the first node after. Assumes
// that the key is not present in the skip list. On entry, before should
}
}
+template <class Comparator>
+inline void InlineSkipList<Comparator>::Iterator::RandomSeek() {
+ node_ = list_->FindRandomEntry();
+}
+
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
}
}
+template <class Comparator>
+typename InlineSkipList<Comparator>::Node*
+InlineSkipList<Comparator>::FindRandomEntry() const {
+ // TODO(bjlemaire): consider adding PREFETCH calls.
+ Node *x = head_, *scan_node = nullptr, *limit_node = nullptr;
+
+ // We start at the max level.
+ // FOr each level, we look at all the nodes at the level, and
+ // we randomly pick one of them. Then decrement the level
+ // and reiterate the process.
+ // eg: assume GetMaxHeight()=5, and there are #100 elements (nodes).
+ // level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15.
+ // We will consider all the nodes between #15 (inclusive) and #67
+ // (exclusive). #67 is called 'limit_node' here.
+ // level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose
+ // #51. #67 remains 'limit_node'.
+ // [...]
+ // level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57.
+ // Return Node #57.
+ std::vector<Node*> lvl_nodes;
+ Random* rnd = Random::GetTLSInstance();
+ int level = GetMaxHeight() - 1;
+
+ while (level >= 0) {
+ lvl_nodes.clear();
+ scan_node = x;
+ while (scan_node != limit_node) {
+ lvl_nodes.push_back(scan_node);
+ scan_node = scan_node->Next(level);
+ }
+ uint32_t rnd_idx = rnd->Next() % lvl_nodes.size();
+ x = lvl_nodes[rnd_idx];
+ if (rnd_idx + 1 < lvl_nodes.size()) {
+ limit_node = lvl_nodes[rnd_idx + 1];
+ }
+ level--;
+ }
+ // There is a special case where x could still be the head_
+ // (note that the head_ contains no key).
+ return x == head_ ? head_->Next(0) : x;
+}
+
template <class Comparator>
uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const {
uint64_t count = 0;
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
+#include <random>
+
#include "db/memtable.h"
#include "memory/arena.h"
#include "memtable/inlineskiplist.h"
return (end_count >= start_count) ? (end_count - start_count) : 0;
}
+ void UniqueRandomSample(const uint64_t& num_entries,
+ const uint64_t& target_sample_size,
+ std::unordered_set<const char*>* entries) override {
+ entries->clear();
+ // Avoid divide-by-0.
+ assert(target_sample_size > 0);
+ assert(num_entries > 0);
+ // NOTE: the size of entries is not enforced to be exactly
+ // target_sample_size at the end of this function, it might be slightly
+ // greater or smaller.
+ SkipListRep::Iterator iter(&skip_list_);
+ // There are two methods to create the subset of samples (size m)
+ // from the table containing N elements:
+ // 1-Iterate linearly through the N memtable entries. For each entry i,
+ // add it to the sample set with a probability
+ // (target_sample_size - entries.size() ) / (N-i).
+ //
+ // 2-Pick m random elements without repetition.
+ // We pick Option 2 when m<sqrt(N) and
+ // Option 1 when m > sqrt(N).
+ if (target_sample_size >
+ static_cast<uint64_t>(std::sqrt(1.0 * num_entries))) {
+ Random* rnd = Random::GetTLSInstance();
+ iter.SeekToFirst();
+ uint64_t counter = 0, num_samples_left = target_sample_size;
+ for (; iter.Valid() && (num_samples_left > 0); iter.Next(), counter++) {
+ // Add entry to sample set with probability
+ // num_samples_left/(num_entries - counter).
+ if (rnd->Next() % (num_entries - counter) < num_samples_left) {
+ entries->insert(iter.key());
+ num_samples_left--;
+ }
+ }
+ } else {
+ // Option 2: pick m random elements with no duplicates.
+ // If Option 2 is picked, then target_sample_size<sqrt(N)
+ // Using a set spares the need to check for duplicates.
+ for (uint64_t i = 0; i < target_sample_size; i++) {
+ // We give it 5 attempts to find a non-duplicate
+ // With 5 attempts, the chances of returning `entries` set
+ // of size target_sample_size is:
+ // PROD_{i=1}^{target_sample_size-1} [1-(i/N)^5]
+ // which is monotonically increasing with N in the worse case
+ // of target_sample_size=sqrt(N), and is always >99.9% for N>4.
+ // At worst, for the final pick , when m=sqrt(N) there is
+ // a probability of p= 1/sqrt(N) chances to find a duplicate.
+ for (uint64_t j = 0; j < 5; j++) {
+ iter.RandomSeek();
+ // unordered_set::insert returns pair<iterator, bool>.
+ // The second element is true if an insert successfully happened.
+ // If element is already in the set, this bool will be false, and
+ // true otherwise.
+ if ((entries->insert(iter.key())).second) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
~SkipListRep() override {}
// Iteration over the contents of a skip list
}
}
+ void RandomSeek() override { iter_.RandomSeek(); }
+
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst() override { iter_.SeekToFirst(); }
{"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL},
{"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}};
-static std::unordered_map<std::string, MemPurgePolicy>
- experimental_mempurge_policy_string_map = {
- {"kAlternate", MemPurgePolicy::kAlternate},
- {"kAlways", MemPurgePolicy::kAlways}};
-
static std::unordered_map<std::string, OptionTypeInfo>
db_mutable_options_type_info = {
{"allow_os_buffer",
{offsetof(struct ImmutableDBOptions, error_if_exists),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
- {"experimental_allow_mempurge",
- {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge),
- OptionType::kBoolean, OptionVerificationType::kNormal,
+ {"experimental_mempurge_threshold",
+ {offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold),
+ OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
- {"experimental_mempurge_policy",
- OptionTypeInfo::Enum<MemPurgePolicy>(
- offsetof(struct ImmutableDBOptions, experimental_mempurge_policy),
- &experimental_mempurge_policy_string_map)},
{"is_fd_close_on_exec",
{offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
OptionType::kBoolean, OptionVerificationType::kNormal,
allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec),
advise_random_on_open(options.advise_random_on_open),
- experimental_allow_mempurge(options.experimental_allow_mempurge),
- experimental_mempurge_policy(options.experimental_mempurge_policy),
+ experimental_mempurge_threshold(options.experimental_mempurge_threshold),
db_write_buffer_size(options.db_write_buffer_size),
write_buffer_manager(options.write_buffer_manager),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
is_fd_close_on_exec);
ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d",
advise_random_on_open);
- ROCKS_LOG_HEADER(log,
- " Options.experimental_allow_mempurge: %d",
- experimental_allow_mempurge);
- ROCKS_LOG_HEADER(log,
- " Options.experimental_mempurge_policy: %d",
- static_cast<int>(experimental_mempurge_policy));
+ ROCKS_LOG_HEADER(
+ log, " Options.experimental_mempurge_threshold: %f",
+ experimental_mempurge_threshold);
ROCKS_LOG_HEADER(
log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
db_write_buffer_size);
bool allow_fallocate;
bool is_fd_close_on_exec;
bool advise_random_on_open;
- bool experimental_allow_mempurge;
- MemPurgePolicy experimental_mempurge_policy;
+ double experimental_mempurge_threshold;
size_t db_write_buffer_size;
std::shared_ptr<WriteBufferManager> write_buffer_manager;
DBOptions::AccessHint access_hint_on_compaction_start;
{"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
- {"experimental_allow_mempurge", "false"},
+ {"experimental_mempurge_threshold", "0.0"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
- ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
+ ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
{"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
- {"experimental_allow_mempurge", "false"},
+ {"experimental_mempurge_threshold", "0.0"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
- ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
+ ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
return ROCKSDB_NAMESPACE::kSnappyCompression; // default value
}
-static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
- const char* mpolicy) {
- assert(mpolicy);
- if (!strcasecmp(mpolicy, "kAlways")) {
- return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
- } else if (!strcasecmp(mpolicy, "kAlternate")) {
- return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
- }
-
- fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy);
- return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
-}
-
static std::string ColumnFamilyName(size_t i) {
if (i == 0) {
return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
DEFINE_bool(allow_concurrent_memtable_write, true,
"Allow multi-writers to update mem tables in parallel.");
-DEFINE_bool(experimental_allow_mempurge, false,
- "Allow memtable garbage collection.");
-
-DEFINE_string(experimental_mempurge_policy, "kAlternate",
- "Specify memtable garbage collection policy.");
+DEFINE_double(experimental_mempurge_threshold, 0.0,
+ "Maximum useful payload ratio estimate that triggers a mempurge "
+ "(memtable garbage collection).");
DEFINE_bool(inplace_update_support,
ROCKSDB_NAMESPACE::Options().inplace_update_support,
options.delayed_write_rate = FLAGS_delayed_write_rate;
options.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
- options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
- options.experimental_mempurge_policy =
- StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
+ options.experimental_mempurge_threshold =
+ FLAGS_experimental_mempurge_threshold;
options.inplace_update_support = FLAGS_inplace_update_support;
options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
options.enable_write_thread_adaptive_yield =
simple_default_params = {
"allow_concurrent_memtable_write": lambda: random.randint(0, 1),
"column_families": 1,
- "experimental_allow_mempurge": lambda: random.randint(0, 1),
- "experimental_mempurge_policy": lambda: random.choice(["kAlways", "kAlternate"]),
+ "experimental_mempurge_threshold": lambda: 10.0*random.random(),
"max_background_compactions": 1,
"max_bytes_for_level_base": 67108864,
"memtablerep": "skip_list",