options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
+ options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
// Assert that at least one flush to storage has been performed
ASSERT_GT(sst_count, EXPECTED_SST_COUNT);
// (which will consequently increase the number of mempurges recorded too).
- ASSERT_EQ(mempurge_count, mempurge_count_record);
+ ASSERT_GE(mempurge_count, mempurge_count_record);
// Assert that there is no data corruption, even with
// a flush to storage.
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
+ options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
+ options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
- // Enforce size of a single MemTable to 1MB.
+ // Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
+ options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));
const size_t KVSIZE = 10;
// more than would fit in maximum allowed memtables.
Random rnd(719);
const size_t NUM_REPEAT = 100;
- const size_t RAND_KEY_LENGTH = 8192;
+ const size_t RAND_KEY_LENGTH = 4096;
const size_t RAND_VALUES_LENGTH = 1024;
std::vector<std::string> values_default(KVSIZE), values_pikachu(KVSIZE);
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
- EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
+ if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) {
+ EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
+ }
ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Check that there was no data corruption anywhere,
// path 0 for level 0 file.
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
+ // If mempurge feature is activated, keep track of any potential
+ // memtables coming from a previous mempurge operation.
+ // Used for mempurge policy.
+ if (db_options_.experimental_allow_mempurge) {
+ contains_mempurge_outcome_ = false;
+ for (MemTable* mt : mems_) {
+ if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
+ contains_mempurge_outcome_ = true;
+ break;
+ }
+ }
+ }
+
base_ = cfd_->current();
base_->Ref(); // it is likely that we do not need this reference
}
Status mempurge_s = Status::NotFound("No MemPurge.");
if (db_options_.experimental_allow_mempurge &&
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
- (!mems_.empty())) {
+ (!mems_.empty()) && MemPurgeDecider()) {
mempurge_s = MemPurge();
if (!mempurge_s.ok()) {
// Mempurge is typically aborted when the new_mem output memtable
db_mutex_->Unlock();
assert(!mems_.empty());
+ // Measure purging time.
+ const uint64_t start_micros = clock_->NowMicros();
+ const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000;
+
MemTable* new_mem = nullptr;
+ // For performance/log investigation purposes:
+ // look at how much useful payload we harvest in the new_mem.
+ // This value is then printed to the DB log.
+ double new_mem_capacity = 0.0;
// Create two iterators, one for the memtable data (contains
// info from puts + deletes), and one for the memtable
// or at least range tombstones, copy over the info
// to the new memtable.
if (iter->Valid() || !range_del_agg->IsEmpty()) {
- // Arbitrary heuristic: maxSize is 60% cpacity.
- size_t maxSize = ((mutable_cf_options_.write_buffer_size + 6U) / 10U);
+ // MaxSize is the size of a memtable.
+ size_t maxSize = mutable_cf_options_.write_buffer_size;
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions->compaction_filter_factory != nullptr &&
ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted("Mempurge filled more than one memtable.");
+ new_mem_capacity = 1.0;
break;
}
}
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
+ new_mem_capacity = 1.0;
break;
}
}
new_mem->SetFirstSequenceNumber(new_first_seqno);
// The new_mem is added to the list of immutable memtables
- // only if it filled at less than 60% capacity (arbitrary heuristic).
- if (new_mem->ApproximateMemoryUsage() < maxSize) {
+ // only if it filled at less than 100% capacity and isn't flagged
+ // as in need of being flushed.
+ if (new_mem->ApproximateMemoryUsage() < maxSize &&
+ !(new_mem->ShouldFlushNow())) {
db_mutex_->Lock();
+ uint64_t new_mem_id = mems_[0]->GetID();
+ // Copy lowest memtable ID
+ // House keeping work.
+ for (MemTable* mt : mems_) {
+ new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id;
+ // Note: if m is not a previous mempurge output memtable,
+ // nothing happens.
+ cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
+ }
+ new_mem->SetID(new_mem_id);
+ cfd_->imm()->AddMemPurgeOutputID(new_mem_id);
cfd_->imm()->Add(new_mem,
&job_context_->memtables_to_free,
false /* -> trigger_flush=false:
* adding this memtable
* will not trigger a flush.
*/);
+ new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 /
+ mutable_cf_options_.write_buffer_size;
new_mem->Ref();
db_mutex_->Unlock();
} else {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
+ new_mem_capacity = 1.0;
if (new_mem) {
job_context_->memtables_to_free.push_back(new_mem);
}
} else {
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
}
+ const uint64_t micros = clock_->NowMicros() - start_micros;
+ const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Mempurge lasted %" PRIu64
+ " microseconds, and %" PRIu64
+ " cpu "
+ "microseconds. Status is %s ok. Perc capacity: %f\n",
+ cfd_->GetName().c_str(), job_context_->job_id, micros,
+ cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
return s;
}
+bool FlushJob::MemPurgeDecider() {
+ MemPurgePolicy policy = db_options_.experimental_mempurge_policy;
+ if (policy == MemPurgePolicy::kAlways) {
+ return true;
+ } else if (policy == MemPurgePolicy::kAlternate) {
+ // Note: if at least one of the flushed memtables is
+ // an output of a previous mempurge process, then flush
+ // to storage.
+ return !(contains_mempurge_outcome_);
+ }
+ return false;
+}
+
Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
- stats.micros = clock_->NowMicros() - start_micros;
- stats.cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
+ const uint64_t micros = clock_->NowMicros() - start_micros;
+ const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
+ stats.micros = micros;
+ stats.cpu_micros = cpu_micros;
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Flush lasted %" PRIu64
+ " microseconds, and %" PRIu64 " cpu microseconds.\n",
+ cfd_->GetName().c_str(), job_context_->job_id, micros,
+ cpu_micros);
if (has_output) {
stats.bytes_written = meta_.fd.GetFileSize();
stats.num_output_files_blob = static_cast<int>(blobs.size());
+ if (db_options_.experimental_allow_mempurge && s.ok()) {
+ // The db_mutex is held at this point.
+ for (MemTable* mt : mems_) {
+ // Note: if m is not a previous mempurge output memtable,
+ // nothing happens here.
+ cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
+ }
+ }
+
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(
InternalStats::BYTES_FLUSHED,
stats.bytes_written + stats.bytes_written_blob);
RecordFlushIOStats();
+
return s;
}
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge();
+ bool MemPurgeDecider();
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE
const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;
+
+ // Used when experimental_allow_mempurge set to true.
+ bool contains_mempurge_outcome_;
};
} // namespace ROCKSDB_NAMESPACE
}
#endif // !ROCKSDB_LITE
+ // Returns a heuristic flush decision
+ bool ShouldFlushNow();
+
private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
std::unique_ptr<FlushJobInfo> flush_job_info_;
#endif // !ROCKSDB_LITE
- // Returns a heuristic flush decision
- bool ShouldFlushNow();
-
// Updates flush_state_ using ShouldFlushNow()
void UpdateFlushState();
// not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
+ void AddMemPurgeOutputID(uint64_t mid) {
+ if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
+ mempurged_ids_.insert(mid);
+ }
+ }
+
+ void RemoveMemPurgeOutputID(uint64_t mid) {
+ if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
+ mempurged_ids_.erase(mid);
+ }
+ }
+
+ bool IsMemPurgeOutput(uint64_t mid) {
+ if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
+ return false;
+ }
+ return true;
+ }
private:
friend Status InstallMemtableAtomicFlushResults(
// Cached value of current_->HasHistory().
std::atomic<bool> current_has_history_;
+
+ // Store the IDs of the memtables installed in this
+ // list that result from a mempurge operation.
+ std::unordered_set<uint64_t> mempurged_ids_;
};
// Installs memtable atomic flush results.
extern const char* kHostnameForDbHostId;
+enum class MemPurgePolicy : char {
+ kAlternate = 0x00,
+ kAlways = 0x01,
+};
+
enum class CompactionServiceJobStatus : char {
kSuccess,
kFailure,
// If true, allows for memtable purge instead of flush to storage.
// (experimental).
bool experimental_allow_mempurge = false;
+ // If experimental_allow_mempurge is true, will dictate MemPurge
+ // policy.
+ // Default: kAlternate
+ // (experimental).
+ MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate;
// Amount of data to build up in memtables across all column
// families before writing to disk.
{"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL},
{"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}};
+static std::unordered_map<std::string, MemPurgePolicy>
+ experimental_mempurge_policy_string_map = {
+ {"kAlternate", MemPurgePolicy::kAlternate},
+ {"kAlways", MemPurgePolicy::kAlways}};
+
static std::unordered_map<std::string, OptionTypeInfo>
db_mutable_options_type_info = {
{"allow_os_buffer",
{offsetof(struct ImmutableDBOptions, experimental_allow_mempurge),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
+ {"experimental_mempurge_policy",
+ OptionTypeInfo::Enum<MemPurgePolicy>(
+ offsetof(struct ImmutableDBOptions, experimental_mempurge_policy),
+ &experimental_mempurge_policy_string_map)},
{"is_fd_close_on_exec",
{offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
OptionType::kBoolean, OptionVerificationType::kNormal,
is_fd_close_on_exec(options.is_fd_close_on_exec),
advise_random_on_open(options.advise_random_on_open),
experimental_allow_mempurge(options.experimental_allow_mempurge),
+ experimental_mempurge_policy(options.experimental_mempurge_policy),
db_write_buffer_size(options.db_write_buffer_size),
write_buffer_manager(options.write_buffer_manager),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
ROCKS_LOG_HEADER(log,
" Options.experimental_allow_mempurge: %d",
experimental_allow_mempurge);
+ ROCKS_LOG_HEADER(log,
+ " Options.experimental_mempurge_policy: %d",
+ static_cast<int>(experimental_mempurge_policy));
ROCKS_LOG_HEADER(
log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
db_write_buffer_size);
bool is_fd_close_on_exec;
bool advise_random_on_open;
bool experimental_allow_mempurge;
+ MemPurgePolicy experimental_mempurge_policy;
size_t db_write_buffer_size;
std::shared_ptr<WriteBufferManager> write_buffer_manager;
DBOptions::AccessHint access_hint_on_compaction_start;
return ROCKSDB_NAMESPACE::kSnappyCompression; // default value
}
+static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
+ const char* mpolicy) {
+ assert(mpolicy);
+ if (!strcasecmp(mpolicy, "kAlways")) {
+ return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
+ } else if (!strcasecmp(mpolicy, "kAlternate")) {
+ return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
+ }
+
+ fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy);
+ return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
+}
+
static std::string ColumnFamilyName(size_t i) {
if (i == 0) {
return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
DEFINE_bool(experimental_allow_mempurge, false,
"Allow memtable garbage collection.");
+DEFINE_string(experimental_mempurge_policy, "kAlternate",
+ "Specify memtable garbage collection policy.");
+
DEFINE_bool(inplace_update_support,
ROCKSDB_NAMESPACE::Options().inplace_update_support,
"Support in-place memtable update for smaller or same-size values");
options.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
+ options.experimental_mempurge_policy =
+ StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
options.inplace_update_support = FLAGS_inplace_update_support;
options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
options.enable_write_thread_adaptive_yield =