const Options options,
std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
- std::map<std::string, std::string>* true_data = nullptr) {
+ std::map<std::string, std::string>* true_data = nullptr,
+ ColumnFamilyHandle* cfh = nullptr) {
// Generate a file id if not provided
if (file_id == -1) {
file_id = last_file_id_ + 1;
data.resize(uniq_iter - data.begin());
}
std::string file_path = sst_files_dir_ + ToString(file_id);
- SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
+ SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator,
+ cfh);
Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
if (s.ok()) {
IngestExternalFileOptions ifo;
ifo.allow_global_seqno = allow_global_seqno;
- s = db_->IngestExternalFile({file_path}, ifo);
+ if (cfh) {
+ s = db_->IngestExternalFile(cfh, {file_path}, ifo);
+ } else {
+ s = db_->IngestExternalFile({file_path}, ifo);
+ }
}
if (s.ok() && true_data) {
Status GenerateAndAddExternalFile(
const Options options, std::vector<std::pair<int, std::string>> data,
int file_id = -1, bool allow_global_seqno = false, bool sort_data = false,
- std::map<std::string, std::string>* true_data = nullptr) {
+ std::map<std::string, std::string>* true_data = nullptr,
+ ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& entry : data) {
file_data.emplace_back(Key(entry.first), entry.second);
}
return GenerateAndAddExternalFile(options, file_data, file_id,
- allow_global_seqno, sort_data, true_data);
+ allow_global_seqno, sort_data, true_data,
+ cfh);
}
Status GenerateAndAddExternalFile(
const Options options, std::vector<int> keys, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
- std::map<std::string, std::string>* true_data = nullptr) {
+ std::map<std::string, std::string>* true_data = nullptr,
+ ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& k : keys) {
file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
}
return GenerateAndAddExternalFile(options, file_data, file_id,
- allow_global_seqno, sort_data, true_data);
+ allow_global_seqno, sort_data, true_data,
+ cfh);
}
Status DeprecatedAddFile(const std::vector<std::string>& files,
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
}
+class TestIngestExternalFileListener : public EventListener {
+ public:
+ void OnExternalFileIngested(DB* db,
+ const ExternalFileIngestionInfo& info) override {
+ ingested_files.push_back(info);
+ }
+
+ std::vector<ExternalFileIngestionInfo> ingested_files;
+};
+
+TEST_F(ExternalSSTFileTest, IngestionListener) {
+ Options options = CurrentOptions();
+ TestIngestExternalFileListener* listener =
+ new TestIngestExternalFileListener();
+ options.listeners.emplace_back(listener);
+ CreateAndReopenWithCF({"koko", "toto"}, options);
+
+ // Ingest into default cf
+ ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
+ handles_[0]));
+ ASSERT_EQ(listener->ingested_files.size(), 1);
+ ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
+ ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
+ ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
+ 0);
+ ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
+ "default");
+
+ // Ingest into cf1
+ ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
+ handles_[1]));
+ ASSERT_EQ(listener->ingested_files.size(), 2);
+ ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
+ ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
+ ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
+ 1);
+ ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
+ "koko");
+
+ // Ingest into cf2
+ ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
+ handles_[2]));
+ ASSERT_EQ(listener->ingested_files.size(), 3);
+ ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
+ ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
+ ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
+ 2);
+ ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
+ "toto");
+}
#endif // ROCKSDB_LITE
} // namespace rocksdb
};
+struct ExternalFileIngestionInfo {
+ // the name of the column family
+ std::string cf_name;
+ // Path of the file outside the DB
+ std::string external_file_path;
+ // Path of the file inside the DB
+ std::string internal_file_path;
+ // The global sequence number assigned to keys in this file
+ SequenceNumber global_seqno;
+ // Table properties of the table being flushed
+ TableProperties table_properties;
+};
+
+
// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
virtual void OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle* handle) {
}
+ // A call-back function for RocksDB which will be called after an external
+ // file is ingested using IngestExternalFile.
+ //
+ // Note that the this function will run on the same thread as
+ // IngestExternalFile(), if this function is blocked, IngestExternalFile()
+ // will be blocked from finishing.
+ virtual void OnExternalFileIngested(
+ DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
+
virtual ~EventListener() {}
};