Close();
}
+#ifndef ROCKSDB_LITE
+// This simple Listener can only handle one flush at a time.
+class TestFlushListener : public EventListener {
+ public:
+ TestFlushListener(Env* env, DBFlushTest* test)
+ : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
+ db_closed = false;
+ }
+
+ ~TestFlushListener() override {
+ prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
+ }
+ void OnTableFileCreated(const TableFileCreationInfo& info) override {
+ // remember the info for later checking the FlushJobInfo.
+ prev_fc_info_ = info;
+ ASSERT_GT(info.db_name.size(), 0U);
+ ASSERT_GT(info.cf_name.size(), 0U);
+ ASSERT_GT(info.file_path.size(), 0U);
+ ASSERT_GT(info.job_id, 0);
+ ASSERT_GT(info.table_properties.data_size, 0U);
+ ASSERT_GT(info.table_properties.raw_key_size, 0U);
+ ASSERT_GT(info.table_properties.raw_value_size, 0U);
+ ASSERT_GT(info.table_properties.num_data_blocks, 0U);
+ ASSERT_GT(info.table_properties.num_entries, 0U);
+ ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
+ ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
+ }
+
+ void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
+ flushed_dbs_.push_back(db);
+ flushed_column_family_names_.push_back(info.cf_name);
+ if (info.triggered_writes_slowdown) {
+ slowdown_count++;
+ }
+ if (info.triggered_writes_stop) {
+ stop_count++;
+ }
+ // verify whether the previously created file matches the flushed file.
+ ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
+ ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
+ ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
+ ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
+ ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
+
+ // Note: the following chunk relies on the notification pertaining to the
+ // database pointed to by DBTestBase::db_, and is thus bypassed when
+ // that assumption does not hold (see the test case MultiDBMultiListeners
+ // below).
+ ASSERT_TRUE(test_);
+ if (db == test_->db_) {
+ std::vector<std::vector<FileMetaData>> files_by_level;
+ test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(),
+ &files_by_level);
+
+ ASSERT_FALSE(files_by_level.empty());
+ auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
+ [&](const FileMetaData& meta) {
+ return meta.fd.GetNumber() == info.file_number;
+ });
+ ASSERT_NE(it, files_by_level[0].end());
+ ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
+ }
+
+ ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
+ ASSERT_GT(info.thread_id, 0U);
+ }
+
+ std::vector<std::string> flushed_column_family_names_;
+ std::vector<DB*> flushed_dbs_;
+ int slowdown_count;
+ int stop_count;
+ bool db_closing;
+ std::atomic_bool db_closed;
+ TableFileCreationInfo prev_fc_info_;
+
+ protected:
+ Env* env_;
+ DBFlushTest* test_;
+};
+#endif // !ROCKSDB_LITE
+
TEST_F(DBFlushTest, MemPurgeBasic) {
Options options = CurrentOptions();
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
- options.experimental_mempurge_threshold =
- 1.0; // std::numeric_limits<double>::max();
+ options.experimental_mempurge_threshold = 1.0;
+#ifndef ROCKSDB_LITE
+ TestFlushListener* listener = new TestFlushListener(options.env, this);
+ options.listeners.emplace_back(listener);
+#endif // !ROCKSDB_LITE
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
-
+#ifndef ROCKSDB_LITE
+ TestFlushListener* listener = new TestFlushListener(options.env, this);
+ options.listeners.emplace_back(listener);
+#endif // !ROCKSDB_LITE
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
- options.experimental_mempurge_threshold =
- 1.0; // std::numeric_limits<double>::max();
+ options.experimental_mempurge_threshold = 1.0;
+
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
-
+#ifndef ROCKSDB_LITE
+ TestFlushListener* listener = new TestFlushListener(options.env, this);
+ options.listeners.emplace_back(listener);
+#endif // !ROCKSDB_LITE
// Create a ConditionalUpdate compaction filter
// that will update all the values of the KV pairs
// where the keys are "lower" than KEY4.
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
- options.experimental_mempurge_threshold =
- 1.0; // std::numeric_limits<double>::max();
+ options.experimental_mempurge_threshold = 1.0;
+
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
// Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype.
- options.experimental_mempurge_threshold =
- 1.0; // std::numeric_limits<double>::max();
+ options.experimental_mempurge_threshold = 1.0;
+
ASSERT_OK(TryReopen(options));
const size_t KVSIZE = 10;
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cinttypes>
+#include <deque>
#include "db/builder.h"
#include "db/db_impl/db_impl.h"
need_cancel = true;
}
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
-
+ bool switched_to_mempurge = false;
// Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion.
//
// and EventListener callback will be called when the db_mutex
// is unlocked by the current thread.
if (s.ok()) {
- s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
+ s = flush_job.Run(&logs_with_prep_tracker_, &file_meta,
+ &switched_to_mempurge);
need_cancel = false;
}
// from never needing it or ignoring the flush job status
io_s.PermitUncheckedError();
}
- if (s.ok()) {
+ // If flush ran smoothly and no mempurge happened
+ // install new SST file path.
+ if (s.ok() && (!switched_to_mempurge)) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, mutable_cf_options,
}
std::vector<FileMetaData> file_meta(num_cfs);
+ // Use of deque<bool> because vector<bool>
+ // is specific and doesn't allow &v[i].
+ std::deque<bool> switched_to_mempurge(num_cfs, false);
Status s;
IOStatus log_io_s = IOStatus::OK();
assert(num_cfs == static_cast<int>(jobs.size()));
}
if (s.ok()) {
+ assert(switched_to_mempurge.size() ==
+ static_cast<long unsigned int>(num_cfs));
// TODO (yanqin): parallelize jobs with threads.
for (int i = 1; i != num_cfs; ++i) {
exec_status[i].second =
- jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
+ jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i],
+ &(switched_to_mempurge.at(i)));
exec_status[i].first = true;
io_status[i] = jobs[i]->io_status();
}
}
assert(exec_status.size() > 0);
assert(!file_meta.empty());
- exec_status[0].second =
- jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
+ exec_status[0].second = jobs[0]->Run(
+ &logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */,
+ switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0)));
exec_status[0].first = true;
io_status[0] = jobs[0]->io_status();
immutable_db_options_.sst_file_manager.get());
assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
for (int i = 0; s.ok() && i != num_cfs; ++i) {
+ // If mempurge happened instead of Flush,
+ // no NotifyOnFlushCompleted call (no SST file created).
+ if (switched_to_mempurge[i]) {
+ continue;
+ }
if (cfds[i]->IsDropped()) {
continue;
}
base_->Ref(); // it is likely that we do not need this reference
}
-Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
- FileMetaData* file_meta) {
+Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
+ bool* switched_to_mempurge) {
TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
assert(pick_memtable_called);
ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
mempurge_s.ToString().c_str());
}
+ } else {
+ if (switched_to_mempurge) {
+ *switched_to_mempurge = true;
+ } else {
+ // The mempurge process was successful, but no switch_to_mempurge
+ // pointer provided so no way to propagate the state of flush job.
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Mempurge process succeeded"
+ "but no 'switched_to_mempurge' ptr provided.\n");
+ }
}
}
Status s;
// we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem->Ref();
+#ifndef ROCKSDB_LITE
+ // Piggyback FlushJobInfo on the first flushed memtable.
+ db_mutex_->AssertHeld();
+ meta_.fd.file_size = 0;
+ mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
+#endif // !ROCKSDB_LITE
db_mutex_->Unlock();
} else {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));