} else if (parsed_params.cmd == RestoreCommand::Name()) {
return new RestoreCommand(parsed_params.cmd_params,
parsed_params.option_map, parsed_params.flags);
+ } else if (parsed_params.cmd == WriteExternalSstFilesCommand::Name()) {
+ return new WriteExternalSstFilesCommand(parsed_params.cmd_params,
+ parsed_params.option_map,
+ parsed_params.flags);
+ } else if (parsed_params.cmd == IngestExternalSstFilesCommand::Name()) {
+ return new IngestExternalSstFilesCommand(parsed_params.cmd_params,
+ parsed_params.option_map,
+ parsed_params.flags);
}
return nullptr;
}
}
}
+void WriteExternalSstFilesCommand::Help(std::string& ret) {
+ ret.append(" ");
+ ret.append(WriteExternalSstFilesCommand::Name());
+ ret.append(" <output_sst_path>");
+ ret.append("\n");
+}
+
+WriteExternalSstFilesCommand::WriteExternalSstFilesCommand(
+ const std::vector<std::string>& params,
+ const std::map<std::string, std::string>& options,
+ const std::vector<std::string>& flags)
+ : LDBCommand(
+ options, flags, false /* is_read_only */,
+ BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX, ARG_FROM,
+ ARG_TO, ARG_CREATE_IF_MISSING})) {
+ create_if_missing_ =
+ IsFlagPresent(flags, ARG_CREATE_IF_MISSING) ||
+ ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false);
+ if (params.size() != 1) {
+ exec_state_ = LDBCommandExecuteResult::Failed(
+ "output SST file path must be specified");
+ } else {
+ output_sst_path_ = params.at(0);
+ }
+}
+
+void WriteExternalSstFilesCommand::DoCommand() {
+ if (!db_) {
+ assert(GetExecuteState().IsFailed());
+ return;
+ }
+ ColumnFamilyHandle* cfh = GetCfHandle();
+ SstFileWriter sst_file_writer(EnvOptions(), db_->GetOptions(), cfh);
+ Status status = sst_file_writer.Open(output_sst_path_);
+ if (!status.ok()) {
+ exec_state_ = LDBCommandExecuteResult::Failed("failed to open SST file: " +
+ status.ToString());
+ return;
+ }
+
+ int bad_lines = 0;
+ std::string line;
+ std::ifstream ifs_stdin("/dev/stdin");
+ std::istream* istream_p = ifs_stdin.is_open() ? &ifs_stdin : &std::cin;
+ while (getline(*istream_p, line, '\n')) {
+ std::string key;
+ std::string value;
+ if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) {
+ status = sst_file_writer.Put(key, value);
+ if (!status.ok()) {
+ exec_state_ = LDBCommandExecuteResult::Failed(
+ "failed to write record to file: " + status.ToString());
+ return;
+ }
+ } else if (0 == line.find("Keys in range:")) {
+ // ignore this line
+ } else if (0 == line.find("Created bg thread 0x")) {
+ // ignore this line
+ } else {
+ bad_lines++;
+ }
+ }
+
+ status = sst_file_writer.Finish();
+ if (!status.ok()) {
+ exec_state_ = LDBCommandExecuteResult::Failed(
+ "Failed to finish writing to file: " + status.ToString());
+ return;
+ }
+
+ if (bad_lines > 0) {
+ fprintf(stderr, "Warning: %d bad lines ignored.\n", bad_lines);
+ }
+ exec_state_ = LDBCommandExecuteResult::Succeed(
+ "external SST file written to " + output_sst_path_);
+}
+
+Options WriteExternalSstFilesCommand::PrepareOptionsForOpenDB() {
+ Options opt = LDBCommand::PrepareOptionsForOpenDB();
+ opt.create_if_missing = create_if_missing_;
+ return opt;
+}
+
+const std::string IngestExternalSstFilesCommand::ARG_MOVE_FILES = "move_files";
+const std::string IngestExternalSstFilesCommand::ARG_SNAPSHOT_CONSISTENCY =
+ "snapshot_consistency";
+const std::string IngestExternalSstFilesCommand::ARG_ALLOW_GLOBAL_SEQNO =
+ "allow_global_seqno";
+const std::string IngestExternalSstFilesCommand::ARG_ALLOW_BLOCKING_FLUSH =
+ "allow_blocking_flush";
+const std::string IngestExternalSstFilesCommand::ARG_INGEST_BEHIND =
+ "ingest_behind";
+
+void IngestExternalSstFilesCommand::Help(std::string& ret) {
+ ret.append(" ");
+ ret.append(IngestExternalSstFilesCommand::Name());
+ ret.append(" <input_sst_path>");
+ ret.append(" [--" + ARG_MOVE_FILES + "] ");
+ ret.append(" [--" + ARG_SNAPSHOT_CONSISTENCY + "] ");
+ ret.append(" [--" + ARG_ALLOW_GLOBAL_SEQNO + "] ");
+ ret.append(" [--" + ARG_ALLOW_BLOCKING_FLUSH + "] ");
+ ret.append(" [--" + ARG_INGEST_BEHIND + "] ");
+ ret.append("\n");
+}
+
+IngestExternalSstFilesCommand::IngestExternalSstFilesCommand(
+ const std::vector<std::string>& params,
+ const std::map<std::string, std::string>& options,
+ const std::vector<std::string>& flags)
+ : LDBCommand(
+ options, flags, false /* is_read_only */,
+ BuildCmdLineOptions({ARG_MOVE_FILES, ARG_SNAPSHOT_CONSISTENCY,
+ ARG_ALLOW_GLOBAL_SEQNO, ARG_CREATE_IF_MISSING,
+ ARG_ALLOW_BLOCKING_FLUSH, ARG_INGEST_BEHIND})),
+ move_files_(false),
+ snapshot_consistency_(true),
+ allow_global_seqno_(true),
+ allow_blocking_flush_(true),
+ ingest_behind_(false) {
+ create_if_missing_ =
+ IsFlagPresent(flags, ARG_CREATE_IF_MISSING) ||
+ ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false);
+ move_files_ = IsFlagPresent(flags, ARG_MOVE_FILES) ||
+ ParseBooleanOption(options, ARG_MOVE_FILES, false);
+ snapshot_consistency_ =
+ IsFlagPresent(flags, ARG_SNAPSHOT_CONSISTENCY) ||
+ ParseBooleanOption(options, ARG_SNAPSHOT_CONSISTENCY, true);
+ allow_global_seqno_ =
+ IsFlagPresent(flags, ARG_ALLOW_GLOBAL_SEQNO) ||
+ ParseBooleanOption(options, ARG_ALLOW_GLOBAL_SEQNO, true);
+ allow_blocking_flush_ =
+ IsFlagPresent(flags, ARG_ALLOW_BLOCKING_FLUSH) ||
+ ParseBooleanOption(options, ARG_ALLOW_BLOCKING_FLUSH, true);
+ ingest_behind_ = IsFlagPresent(flags, ARG_INGEST_BEHIND) ||
+ ParseBooleanOption(options, ARG_INGEST_BEHIND, false);
+
+ if (params.size() != 1) {
+ exec_state_ =
+ LDBCommandExecuteResult::Failed("input SST path must be specified");
+ } else {
+ input_sst_path_ = params.at(0);
+ }
+}
+
+void IngestExternalSstFilesCommand::DoCommand() {
+ if (!db_) {
+ assert(GetExecuteState().IsFailed());
+ return;
+ }
+ if (GetExecuteState().IsFailed()) {
+ return;
+ }
+ ColumnFamilyHandle* cfh = GetCfHandle();
+ IngestExternalFileOptions ifo;
+ ifo.move_files = move_files_;
+ ifo.snapshot_consistency = snapshot_consistency_;
+ ifo.allow_global_seqno = allow_global_seqno_;
+ ifo.allow_blocking_flush = allow_blocking_flush_;
+ ifo.ingest_behind = ingest_behind_;
+ Status status = db_->IngestExternalFile(cfh, {input_sst_path_}, ifo);
+ if (!status.ok()) {
+ exec_state_ = LDBCommandExecuteResult::Failed(
+ "failed to ingest external SST: " + status.ToString());
+ } else {
+ exec_state_ =
+ LDBCommandExecuteResult::Succeed("external SST files ingested");
+ }
+}
+
+Options IngestExternalSstFilesCommand::PrepareOptionsForOpenDB() {
+ Options opt = LDBCommand::PrepareOptionsForOpenDB();
+ opt.create_if_missing = create_if_missing_;
+ return opt;
+}
+
} // namespace rocksdb
#endif // ROCKSDB_LITE
static void Help(std::string& ret);
};
+class WriteExternalSstFilesCommand : public LDBCommand {
+ public:
+ static std::string Name() { return "write_extern_sst"; }
+ WriteExternalSstFilesCommand(
+ const std::vector<std::string>& params,
+ const std::map<std::string, std::string>& options,
+ const std::vector<std::string>& flags);
+
+ virtual void DoCommand() override;
+
+ virtual bool NoDBOpen() override { return false; }
+
+ virtual Options PrepareOptionsForOpenDB() override;
+
+ static void Help(std::string& ret);
+
+ private:
+ std::string output_sst_path_;
+};
+
+class IngestExternalSstFilesCommand : public LDBCommand {
+ public:
+ static std::string Name() { return "ingest_extern_sst"; }
+ IngestExternalSstFilesCommand(
+ const std::vector<std::string>& params,
+ const std::map<std::string, std::string>& options,
+ const std::vector<std::string>& flags);
+
+ virtual void DoCommand() override;
+
+ virtual bool NoDBOpen() override { return false; }
+
+ virtual Options PrepareOptionsForOpenDB() override;
+
+ static void Help(std::string& ret);
+
+ private:
+ std::string input_sst_path_;
+ bool move_files_;
+ bool snapshot_consistency_;
+ bool allow_global_seqno_;
+ bool allow_blocking_flush_;
+ bool ingest_behind_;
+
+ static const std::string ARG_MOVE_FILES;
+ static const std::string ARG_SNAPSHOT_CONSISTENCY;
+ static const std::string ARG_ALLOW_GLOBAL_SEQNO;
+ static const std::string ARG_ALLOW_BLOCKING_FLUSH;
+ static const std::string ARG_INGEST_BEHIND;
+};
+
} // namespace rocksdb
my_check_output("./ldb %s >/dev/null 2>&1 |grep -v \"Created bg \
thread\"" % params, shell=True)
- except Exception, e:
+ except Exception:
return
self.fail(
"Exception should have been raised for command with params: %s" %
def loadDb(self, params, dumpFile):
return 0 == run_err_null("cat %s | ./ldb load %s" % (dumpFile, params))
+ def writeExternSst(self, params, inputDumpFile, outputSst):
+ return 0 == run_err_null("cat %s | ./ldb write_extern_sst %s %s"
+ % (inputDumpFile, outputSst, params))
+
+ def ingestExternSst(self, params, inputSst):
+ return 0 == run_err_null("./ldb ingest_extern_sst %s %s"
+ % (inputSst, params))
+
def testStringBatchPut(self):
print "Running testStringBatchPut..."
self.assertRunOK("batchput x1 y1 --create_if_missing", "OK")
# non-existing column family.
self.assertRunFAIL("get cf3_1 --column_family=four")
+ def testIngestExternalSst(self):
+ print "Running testIngestExternalSst..."
+
+ # Dump, load, write external sst and ingest it in another db
+ dbPath = os.path.join(self.TMP_DIR, "db1")
+ self.assertRunOK(
+ "batchput --db=%s --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4"
+ % dbPath,
+ "OK")
+ self.assertRunOK("scan --db=%s" % dbPath,
+ "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")
+ dumpFilePath = os.path.join(self.TMP_DIR, "dump1")
+ with open(dumpFilePath, 'w') as f:
+ f.write("x1 ==> y10\nx2 ==> y20\nx3 ==> y30\nx4 ==> y40")
+ externSstPath = os.path.join(self.TMP_DIR, "extern_data1.sst")
+ self.assertTrue(self.writeExternSst("--create_if_missing --db=%s"
+ % dbPath,
+ dumpFilePath,
+ externSstPath))
+ # cannot ingest if allow_global_seqno is false
+ self.assertFalse(
+ self.ingestExternSst(
+ "--create_if_missing --allow_global_seqno=false --db=%s"
+ % dbPath,
+ externSstPath))
+ self.assertTrue(
+ self.ingestExternSst(
+ "--create_if_missing --allow_global_seqno --db=%s"
+ % dbPath,
+ externSstPath))
+ self.assertRunOKFull("scan --db=%s" % dbPath,
+ "x1 : y10\nx2 : y20\nx3 : y30\nx4 : y40")
+
if __name__ == "__main__":
unittest.main()