autovector<const autovector<MemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
+ autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+ committed_flush_jobs_info;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
mems_list.emplace_back(&mems);
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
tmp_file_meta.emplace_back(&file_meta[i]);
+#ifndef ROCKSDB_LITE
+ committed_flush_jobs_info.emplace_back(
+ jobs[i]->GetCommittedFlushJobsInfo());
+#endif //! ROCKSDB_LITE
}
}
s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
- &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
+ committed_flush_jobs_info, &job_context->memtables_to_free,
+ directories_.GetDbDir(), log_buffer);
}
if (s.ok()) {
for (auto cfd : all_cfds) {
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
}
+ autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+ committed_flush_jobs_info;
+#ifndef ROCKSDB_LITE
+ for (auto& job : flush_jobs) {
+ committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
+ }
+#endif //! ROCKSDB_LITE
Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
- &job_context.memtables_to_free, nullptr /* db_directory */,
- nullptr /* log_buffer */);
+ committed_flush_jobs_info, &job_context.memtables_to_free,
+ nullptr /* db_directory */, nullptr /* log_buffer */);
ASSERT_OK(s);
mutex_.Unlock();
#ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
- TestFlushListener* listener = new TestFlushListener(options.env, this);
- options.listeners.emplace_back(listener);
- options.table_properties_collector_factories.push_back(
- std::make_shared<TestPropertiesCollectorFactory>());
- std::vector<std::string> cf_names = {
- "pikachu", "ilya", "muromec", "dobrynia",
- "nikitich", "alyosha", "popovich"};
- CreateAndReopenWithCF(cf_names, options);
-
- ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
- ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
- ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
- ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
- ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
- ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
- ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
- for (int i = 1; i < 8; ++i) {
- ASSERT_OK(Flush(i));
- ASSERT_EQ(listener->flushed_dbs_.size(), i);
- ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
- }
+ for (auto atomic_flush : {false, true}) {
+ options.atomic_flush = atomic_flush;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+ TestFlushListener* listener = new TestFlushListener(options.env, this);
+ options.listeners.emplace_back(listener);
+ options.table_properties_collector_factories.push_back(
+ std::make_shared<TestPropertiesCollectorFactory>());
+ std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
+ "dobrynia", "nikitich", "alyosha",
+ "popovich"};
+ CreateAndReopenWithCF(cf_names, options);
+
+ ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
+ ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
+ ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
+ ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
+ ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
+ ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
+ ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
+ for (int i = 1; i < 8; ++i) {
+ ASSERT_OK(Flush(i));
+ ASSERT_EQ(listener->flushed_dbs_.size(), i);
+ ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
+ }
- // make sure callback functions are called in the right order
- for (size_t i = 0; i < cf_names.size(); i++) {
- ASSERT_EQ(listener->flushed_dbs_[i], db_);
- ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
+ // make sure callback functions are called in the right order
+ for (size_t i = 0; i < cf_names.size(); i++) {
+ ASSERT_EQ(listener->flushed_dbs_[i], db_);
+ ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
+ }
+ Close();
}
}
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_metas,
+ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+ committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
(*mems_list[k])[i]->SetFlushCompleted(true);
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
}
+#ifndef ROCKSDB_LITE
+ if (committed_flush_jobs_info[k]) {
+ assert(!mems_list[k]->empty());
+ assert((*mems_list[k])[0]);
+ std::unique_ptr<FlushJobInfo> flush_job_info =
+ (*mems_list[k])[0]->ReleaseFlushJobInfo();
+ committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
+ }
+#else //! ROCKSDB_LITE
+ (void)committed_flush_jobs_info;
+#endif // ROCKSDB_LITE
}
Status s;
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
+ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+ committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
+ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+ committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
+ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+ committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
} // namespace ROCKSDB_NAMESPACE
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
+ std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
+ committed_flush_jobs_info_storage(cf_ids.size());
+ autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+ committed_flush_jobs_info;
+ for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
+ committed_flush_jobs_info.push_back(
+ &committed_flush_jobs_info_storage[i]);
+ }
+
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions,
- nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr,
- &log_buffer);
+ nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
+ committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
}
};