// path 0 for level 0 file.
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
- // If mempurge feature is activated, keep track of any potential
- // memtables coming from a previous mempurge operation.
- // Used for mempurge policy.
- if (db_options_.experimental_mempurge_threshold > 0.0) {
- contains_mempurge_outcome_ = false;
- for (MemTable* mt : mems_) {
- if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
- contains_mempurge_outcome_ = true;
- break;
- }
- }
- }
-
base_ = cfd_->current();
base_->Ref(); // it is likely that we do not need this reference
}
(!mems_.empty()) && MemPurgeDecider()) {
mempurge_s = MemPurge();
if (!mempurge_s.ok()) {
- // Mempurge is typically aborted when the new_mem output memtable
- // is filled at more than XX % capacity (currently: 60%).
+ // Mempurge is typically aborted when the output
+ // bytes cannot be contained onto a single output memtable.
if (mempurge_s.IsAborted()) {
ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
mempurge_s.ToString().c_str());
!(new_mem->ShouldFlushNow())) {
db_mutex_->Lock();
uint64_t new_mem_id = mems_[0]->GetID();
- // Copy lowest memtable ID
- // House keeping work.
- for (MemTable* mt : mems_) {
- new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id;
- // Note: if m is not a previous mempurge output memtable,
- // nothing happens.
- cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
- }
+
new_mem->SetID(new_mem_id);
- cfd_->imm()->AddMemPurgeOutputID(new_mem_id);
+
// This addition will not trigger another flush, because
// we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
// Payload and useful_payload (in bytes).
// The useful payload ratio of a given MemTable
// is estimated to be useful_payload/payload.
- uint64_t payload = 0, useful_payload = 0;
+ uint64_t payload = 0, useful_payload = 0, entry_size = 0;
+
+ // Local variables used repetitively inside the for-loop
+ // when iterating over the sampled entries.
+ Slice key_slice, value_slice;
+ ParsedInternalKey res;
+ SnapshotImpl min_snapshot;
+ std::string vget;
+ Status mget_s, parse_s;
+ MergeContext merge_context;
+ SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
+ min_seqno_snapshot = 0;
+ bool get_res, can_be_useful_payload, not_in_next_mems;
+
// If estimated_useful_payload is > threshold,
// then flush to storage, else MemPurge.
double estimated_useful_payload = 0.0;
ro.total_order_seek = true;
// Iterate over each memtable of the set.
- for (MemTable* mt : mems_) {
- // If the memtable is the output of a previous mempurge,
- // its approximate useful payload ratio is already calculated.
- if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
- // We make the assumption that this memtable is already
- // free of garbage (garbage underestimation).
- estimated_useful_payload += mt->ApproximateMemoryUsage();
- } else {
- // Else sample from the table.
- uint64_t nentries = mt->num_entries();
- // Corrected Cochran formula for small populations
- // (converges to n0 for large populations).
- uint64_t target_sample_size =
- static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
- std::unordered_set<const char*> sentries = {};
- // Populate sample entries set.
- mt->UniqueRandomSample(target_sample_size, &sentries);
-
- // Estimate the garbage ratio by comparing if
- // each sample corresponds to a valid entry.
- for (const char* ss : sentries) {
- ParsedInternalKey res;
- Slice entry_slice = GetLengthPrefixedSlice(ss);
- Status parse_s =
- ParseInternalKey(entry_slice, &res, true /*log_err_key*/);
- if (!parse_s.ok()) {
- ROCKS_LOG_WARN(db_options_.info_log,
- "Memtable Decider: ParseInternalKey did not parse "
- "entry_slice %s"
- "successfully.",
- entry_slice.data());
- }
- LookupKey lkey(res.user_key, kMaxSequenceNumber);
- std::string vget;
- Status s;
- MergeContext merge_context;
- SequenceNumber max_covering_tombstone_seq = 0, sqno = 0;
-
- // Pick the oldest existing snapshot that is more recent
- // than the sequence number of the sampled entry.
- SequenceNumber min_seqno_snapshot = kMaxSequenceNumber;
- SnapshotImpl min_snapshot;
- for (SequenceNumber seq_num : existing_snapshots_) {
- if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
- min_seqno_snapshot = seq_num;
- }
- }
- min_snapshot.number_ = min_seqno_snapshot;
- ro.snapshot =
- min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
-
- // Estimate if the sample entry is valid or not.
- bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context,
- &max_covering_tombstone_seq, &sqno, ro);
- if (!gres) {
- ROCKS_LOG_WARN(
- db_options_.info_log,
- "Memtable Get returned false when Get(sampled entry). "
- "Yet each sample entry should exist somewhere in the memtable, "
- "unrelated to whether it has been deleted or not.");
- }
- payload += entry_slice.size();
-
- // TODO(bjlemaire): evaluate typeMerge.
- // This is where the sampled entry is estimated to be
- // garbage or not. Note that this is a garbage *estimation*
- // because we do not include certain items such as
- // CompactionFitlers triggered at flush, or if the same delete
- // has been inserted twice or more in the memtable.
- if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) {
- useful_payload += entry_slice.size();
- } else if (((res.type == kTypeDeletion) ||
- (res.type == kTypeSingleDeletion)) &&
- s.IsNotFound() && gres) {
- useful_payload += entry_slice.size();
+ for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
+ mem_iter++) {
+ MemTable* mt = *mem_iter;
+
+ // Else sample from the table.
+ uint64_t nentries = mt->num_entries();
+ // Corrected Cochran formula for small populations
+ // (converges to n0 for large populations).
+ uint64_t target_sample_size =
+ static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
+ std::unordered_set<const char*> sentries = {};
+ // Populate sample entries set.
+ mt->UniqueRandomSample(target_sample_size, &sentries);
+
+ // Estimate the garbage ratio by comparing if
+ // each sample corresponds to a valid entry.
+ for (const char* ss : sentries) {
+ key_slice = GetLengthPrefixedSlice(ss);
+ parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
+ if (!parse_s.ok()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Memtable Decider: ParseInternalKey did not parse "
+ "key_slice %s successfully.",
+ key_slice.data());
+ }
+
+ // Size of the entry is "key size (+ value size if KV entry)"
+ entry_size = key_slice.size();
+ if (res.type == kTypeValue) {
+ value_slice =
+ GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
+ entry_size += value_slice.size();
+ }
+
+ // Count entry bytes as payload.
+ payload += entry_size;
+
+ LookupKey lkey(res.user_key, kMaxSequenceNumber);
+
+ // Paranoia: zero out these values just in case.
+ max_covering_tombstone_seq = 0;
+ sqno = 0;
+
+ // Pick the oldest existing snapshot that is more recent
+ // than the sequence number of the sampled entry.
+ min_seqno_snapshot = kMaxSequenceNumber;
+ for (SequenceNumber seq_num : existing_snapshots_) {
+ if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
+ min_seqno_snapshot = seq_num;
}
}
- if (payload > 0) {
- // We used the estimated useful payload ratio
- // to evaluate how much of the total memtable is useful bytes.
- estimated_useful_payload +=
- (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
- ROCKS_LOG_INFO(
- db_options_.info_log,
- "Mempurge sampling - found garbage ratio from sampling: %f.\n",
- (payload - useful_payload) * 1.0 / payload);
- } else {
+ min_snapshot.number_ = min_seqno_snapshot;
+ ro.snapshot =
+ min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
+
+ // Estimate if the sample entry is valid or not.
+ get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
+ &max_covering_tombstone_seq, &sqno, ro);
+ if (!get_res) {
ROCKS_LOG_WARN(
db_options_.info_log,
- "Mempurge kSampling policy: null payload measured, and collected "
- "sample size is %zu\n.",
- sentries.size());
+ "Memtable Get returned false when Get(sampled entry). "
+ "Yet each sample entry should exist somewhere in the memtable, "
+ "unrelated to whether it has been deleted or not.");
+ }
+
+ // TODO(bjlemaire): evaluate typeMerge.
+ // This is where the sampled entry is estimated to be
+ // garbage or not. Note that this is a garbage *estimation*
+ // because we do not include certain items such as
+ // CompactionFitlers triggered at flush, or if the same delete
+ // has been inserted twice or more in the memtable.
+
+ // Evaluate if the entry can be useful payload
+ // Situation #1: entry is a KV entry, was found in the memtable mt
+ // and the sequence numbers match.
+ can_be_useful_payload = (res.type == kTypeValue) && get_res &&
+ mget_s.ok() && (sqno == res.sequence);
+
+ // Situation #2: entry is a delete entry, was found in the memtable mt
+ // (because gres==true) and no valid KV entry is found.
+ // (note: duplicate delete entries are also taken into
+ // account here, because the sequence number 'sqno'
+ // in memtable->Get(&sqno) operation is set to be equal
+ // to the most recent delete entry as well).
+ can_be_useful_payload |=
+ ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
+ mget_s.IsNotFound() && get_res && (sqno == res.sequence);
+
+ // If there is a chance that the entry is useful payload
+ // Verify that the entry does not appear in the following memtables
+ // (memtables with greater memtable ID/larger sequence numbers).
+ if (can_be_useful_payload) {
+ not_in_next_mems = true;
+ for (auto next_mem_iter = mem_iter + 1;
+ next_mem_iter != std::end(mems_); next_mem_iter++) {
+ if ((*next_mem_iter)
+ ->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
+ &max_covering_tombstone_seq, &sqno, ro)) {
+ not_in_next_mems = false;
+ break;
+ }
+ }
+ if (not_in_next_mems) {
+ useful_payload += entry_size;
+ }
}
}
+ if (payload > 0) {
+ // We use the estimated useful payload ratio to
+ // evaluate how many of the memtable bytes are useful bytes.
+ estimated_useful_payload +=
+ (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
+
+ ROCKS_LOG_INFO(
+ db_options_.info_log,
+ "Mempurge sampling - found garbage ratio from sampling: %f.\n",
+ (payload - useful_payload) * 1.0 / payload);
+ } else {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Mempurge sampling: null payload measured, and collected "
+ "sample size is %zu\n.",
+ sentries.size());
+ }
}
- // We convert the total number of useful paylaod bytes
+ // We convert the total number of useful payload bytes
// into the proportion of memtable necessary to store all these bytes.
// We compare this proportion with the threshold value.
- return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
- threshold;
+ return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
+ threshold);
}
Status FlushJob::WriteLevel0Table() {
stats.num_output_files_blob = static_cast<int>(blobs.size());
- if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) {
- // The db_mutex is held at this point.
- for (MemTable* mt : mems_) {
- // Note: if m is not a previous mempurge output memtable,
- // nothing happens here.
- cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
- }
- }
-
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(