]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add SST ingestion to ldb (#4205)
authorYanqin Jin <yanqin@fb.com>
Thu, 9 Aug 2018 21:18:59 +0000 (14:18 -0700)
committerYanqin Jin <yanqin@fb.com>
Tue, 21 Aug 2018 23:16:17 +0000 (16:16 -0700)
Summary:
We add two subcommands `write_extern_sst` and `ingest_extern_sst` to ldb. This PR avoids changing existing code because we hope to cherry-pick to earlier releases to support compatibility check for external SST file ingestion.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4205

Differential Revision: D9112711

Pulled By: riversand963

fbshipit-source-id: 7cae88380d4de86da8440230e87eca66755648e4

include/rocksdb/utilities/ldb_cmd.h
tools/ldb_cmd.cc
tools/ldb_cmd_impl.h
tools/ldb_test.py
tools/ldb_tool.cc

index 4d91f0a66d07776c28ed793b57c2c47ca503be57..907c9daf20c459a04b73a5ffe67e9cb861c57231 100644 (file)
@@ -210,12 +210,20 @@ class LDBCommand {
   bool ParseStringOption(const std::map<std::string, std::string>& options,
                          const std::string& option, std::string* value);
 
+  /**
+   * Returns the value of the specified option as a boolean.
+   * default_val is used if the option is not found in options.
+   * Throws an exception if the value of the option is not
+   * "true" or "false" (case insensitive).
+   */
+  bool ParseBooleanOption(const std::map<std::string, std::string>& options,
+                          const std::string& option, bool default_val);
+
   Options options_;
   std::vector<ColumnFamilyDescriptor> column_families_;
   LDBOptions ldb_options_;
 
  private:
-  friend class WALDumperCommand;
   /**
    * Interpret command line options and flags to determine if the key
    * should be input/output in hex.
@@ -230,15 +238,6 @@ class LDBCommand {
   bool IsValueHex(const std::map<std::string, std::string>& options,
                   const std::vector<std::string>& flags);
 
-  /**
-   * Returns the value of the specified option as a boolean.
-   * default_val is used if the option is not found in options.
-   * Throws an exception if the value of the option is not
-   * "true" or "false" (case insensitive).
-   */
-  bool ParseBooleanOption(const std::map<std::string, std::string>& options,
-                          const std::string& option, bool default_val);
-
   /**
    * Converts val to a boolean.
    * val must be either true or false (case insensitive).
index 075731259d7e6629d3e94b94e3e42a4d441e9d1b..41ed52cc19b856ad35dd93ac3aff5447ec786bba 100644 (file)
@@ -242,6 +242,14 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) {
   } else if (parsed_params.cmd == RestoreCommand::Name()) {
     return new RestoreCommand(parsed_params.cmd_params,
                               parsed_params.option_map, parsed_params.flags);
+  } else if (parsed_params.cmd == WriteExternalSstFilesCommand::Name()) {
+    return new WriteExternalSstFilesCommand(parsed_params.cmd_params,
+                                            parsed_params.option_map,
+                                            parsed_params.flags);
+  } else if (parsed_params.cmd == IngestExternalSstFilesCommand::Name()) {
+    return new IngestExternalSstFilesCommand(parsed_params.cmd_params,
+                                             parsed_params.option_map,
+                                             parsed_params.flags);
   }
   return nullptr;
 }
@@ -2936,5 +2944,180 @@ void DBFileDumperCommand::DoCommand() {
   }
 }
 
+void WriteExternalSstFilesCommand::Help(std::string& ret) {
+  ret.append("  ");
+  ret.append(WriteExternalSstFilesCommand::Name());
+  ret.append(" <output_sst_path>");
+  ret.append("\n");
+}
+
+WriteExternalSstFilesCommand::WriteExternalSstFilesCommand(
+    const std::vector<std::string>& params,
+    const std::map<std::string, std::string>& options,
+    const std::vector<std::string>& flags)
+    : LDBCommand(
+          options, flags, false /* is_read_only */,
+          BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX, ARG_FROM,
+                               ARG_TO, ARG_CREATE_IF_MISSING})) {
+  create_if_missing_ =
+      IsFlagPresent(flags, ARG_CREATE_IF_MISSING) ||
+      ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false);
+  if (params.size() != 1) {
+    exec_state_ = LDBCommandExecuteResult::Failed(
+        "output SST file path must be specified");
+  } else {
+    output_sst_path_ = params.at(0);
+  }
+}
+
+void WriteExternalSstFilesCommand::DoCommand() {
+  if (!db_) {
+    assert(GetExecuteState().IsFailed());
+    return;
+  }
+  ColumnFamilyHandle* cfh = GetCfHandle();
+  SstFileWriter sst_file_writer(EnvOptions(), db_->GetOptions(), cfh);
+  Status status = sst_file_writer.Open(output_sst_path_);
+  if (!status.ok()) {
+    exec_state_ = LDBCommandExecuteResult::Failed("failed to open SST file: " +
+                                                  status.ToString());
+    return;
+  }
+
+  int bad_lines = 0;
+  std::string line;
+  std::ifstream ifs_stdin("/dev/stdin");
+  std::istream* istream_p = ifs_stdin.is_open() ? &ifs_stdin : &std::cin;
+  while (getline(*istream_p, line, '\n')) {
+    std::string key;
+    std::string value;
+    if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) {
+      status = sst_file_writer.Put(key, value);
+      if (!status.ok()) {
+        exec_state_ = LDBCommandExecuteResult::Failed(
+            "failed to write record to file: " + status.ToString());
+        return;
+      }
+    } else if (0 == line.find("Keys in range:")) {
+      // ignore this line
+    } else if (0 == line.find("Created bg thread 0x")) {
+      // ignore this line
+    } else {
+      bad_lines++;
+    }
+  }
+
+  status = sst_file_writer.Finish();
+  if (!status.ok()) {
+    exec_state_ = LDBCommandExecuteResult::Failed(
+        "Failed to finish writing to file: " + status.ToString());
+    return;
+  }
+
+  if (bad_lines > 0) {
+    fprintf(stderr, "Warning: %d bad lines ignored.\n", bad_lines);
+  }
+  exec_state_ = LDBCommandExecuteResult::Succeed(
+      "external SST file written to " + output_sst_path_);
+}
+
+Options WriteExternalSstFilesCommand::PrepareOptionsForOpenDB() {
+  Options opt = LDBCommand::PrepareOptionsForOpenDB();
+  opt.create_if_missing = create_if_missing_;
+  return opt;
+}
+
+const std::string IngestExternalSstFilesCommand::ARG_MOVE_FILES = "move_files";
+const std::string IngestExternalSstFilesCommand::ARG_SNAPSHOT_CONSISTENCY =
+    "snapshot_consistency";
+const std::string IngestExternalSstFilesCommand::ARG_ALLOW_GLOBAL_SEQNO =
+    "allow_global_seqno";
+const std::string IngestExternalSstFilesCommand::ARG_ALLOW_BLOCKING_FLUSH =
+    "allow_blocking_flush";
+const std::string IngestExternalSstFilesCommand::ARG_INGEST_BEHIND =
+    "ingest_behind";
+
+void IngestExternalSstFilesCommand::Help(std::string& ret) {
+  ret.append("  ");
+  ret.append(IngestExternalSstFilesCommand::Name());
+  ret.append(" <input_sst_path>");
+  ret.append(" [--" + ARG_MOVE_FILES + "] ");
+  ret.append(" [--" + ARG_SNAPSHOT_CONSISTENCY + "] ");
+  ret.append(" [--" + ARG_ALLOW_GLOBAL_SEQNO + "] ");
+  ret.append(" [--" + ARG_ALLOW_BLOCKING_FLUSH + "] ");
+  ret.append(" [--" + ARG_INGEST_BEHIND + "] ");
+  ret.append("\n");
+}
+
+IngestExternalSstFilesCommand::IngestExternalSstFilesCommand(
+    const std::vector<std::string>& params,
+    const std::map<std::string, std::string>& options,
+    const std::vector<std::string>& flags)
+    : LDBCommand(
+          options, flags, false /* is_read_only */,
+          BuildCmdLineOptions({ARG_MOVE_FILES, ARG_SNAPSHOT_CONSISTENCY,
+                               ARG_ALLOW_GLOBAL_SEQNO, ARG_CREATE_IF_MISSING,
+                               ARG_ALLOW_BLOCKING_FLUSH, ARG_INGEST_BEHIND})),
+      move_files_(false),
+      snapshot_consistency_(true),
+      allow_global_seqno_(true),
+      allow_blocking_flush_(true),
+      ingest_behind_(false) {
+  create_if_missing_ =
+      IsFlagPresent(flags, ARG_CREATE_IF_MISSING) ||
+      ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false);
+  move_files_ = IsFlagPresent(flags, ARG_MOVE_FILES) ||
+                ParseBooleanOption(options, ARG_MOVE_FILES, false);
+  snapshot_consistency_ =
+      IsFlagPresent(flags, ARG_SNAPSHOT_CONSISTENCY) ||
+      ParseBooleanOption(options, ARG_SNAPSHOT_CONSISTENCY, true);
+  allow_global_seqno_ =
+      IsFlagPresent(flags, ARG_ALLOW_GLOBAL_SEQNO) ||
+      ParseBooleanOption(options, ARG_ALLOW_GLOBAL_SEQNO, true);
+  allow_blocking_flush_ =
+      IsFlagPresent(flags, ARG_ALLOW_BLOCKING_FLUSH) ||
+      ParseBooleanOption(options, ARG_ALLOW_BLOCKING_FLUSH, true);
+  ingest_behind_ = IsFlagPresent(flags, ARG_INGEST_BEHIND) ||
+                   ParseBooleanOption(options, ARG_INGEST_BEHIND, false);
+
+  if (params.size() != 1) {
+    exec_state_ =
+        LDBCommandExecuteResult::Failed("input SST path must be specified");
+  } else {
+    input_sst_path_ = params.at(0);
+  }
+}
+
+void IngestExternalSstFilesCommand::DoCommand() {
+  if (!db_) {
+    assert(GetExecuteState().IsFailed());
+    return;
+  }
+  if (GetExecuteState().IsFailed()) {
+    return;
+  }
+  ColumnFamilyHandle* cfh = GetCfHandle();
+  IngestExternalFileOptions ifo;
+  ifo.move_files = move_files_;
+  ifo.snapshot_consistency = snapshot_consistency_;
+  ifo.allow_global_seqno = allow_global_seqno_;
+  ifo.allow_blocking_flush = allow_blocking_flush_;
+  ifo.ingest_behind = ingest_behind_;
+  Status status = db_->IngestExternalFile(cfh, {input_sst_path_}, ifo);
+  if (!status.ok()) {
+    exec_state_ = LDBCommandExecuteResult::Failed(
+        "failed to ingest external SST: " + status.ToString());
+  } else {
+    exec_state_ =
+        LDBCommandExecuteResult::Succeed("external SST files ingested");
+  }
+}
+
+Options IngestExternalSstFilesCommand::PrepareOptionsForOpenDB() {
+  Options opt = LDBCommand::PrepareOptionsForOpenDB();
+  opt.create_if_missing = create_if_missing_;
+  return opt;
+}
+
 }   // namespace rocksdb
 #endif  // ROCKSDB_LITE
index e49e851b3998ef6f6054032ee88ef143076dfd74..6443e3ceca3a723a053345b69986da5f6eadc1c7 100644 (file)
@@ -522,4 +522,55 @@ class RestoreCommand : public BackupableCommand {
   static void Help(std::string& ret);
 };
 
+class WriteExternalSstFilesCommand : public LDBCommand {
+ public:
+  static std::string Name() { return "write_extern_sst"; }
+  WriteExternalSstFilesCommand(
+      const std::vector<std::string>& params,
+      const std::map<std::string, std::string>& options,
+      const std::vector<std::string>& flags);
+
+  virtual void DoCommand() override;
+
+  virtual bool NoDBOpen() override { return false; }
+
+  virtual Options PrepareOptionsForOpenDB() override;
+
+  static void Help(std::string& ret);
+
+ private:
+  std::string output_sst_path_;
+};
+
+class IngestExternalSstFilesCommand : public LDBCommand {
+ public:
+  static std::string Name() { return "ingest_extern_sst"; }
+  IngestExternalSstFilesCommand(
+      const std::vector<std::string>& params,
+      const std::map<std::string, std::string>& options,
+      const std::vector<std::string>& flags);
+
+  virtual void DoCommand() override;
+
+  virtual bool NoDBOpen() override { return false; }
+
+  virtual Options PrepareOptionsForOpenDB() override;
+
+  static void Help(std::string& ret);
+
+ private:
+  std::string input_sst_path_;
+  bool move_files_;
+  bool snapshot_consistency_;
+  bool allow_global_seqno_;
+  bool allow_blocking_flush_;
+  bool ingest_behind_;
+
+  static const std::string ARG_MOVE_FILES;
+  static const std::string ARG_SNAPSHOT_CONSISTENCY;
+  static const std::string ARG_ALLOW_GLOBAL_SEQNO;
+  static const std::string ARG_ALLOW_BLOCKING_FLUSH;
+  static const std::string ARG_INGEST_BEHIND;
+};
+
 }  // namespace rocksdb
index fa0ded4382d4af61be807b64f67f8fdfab8452e6..2200fb464b75154b71964e2f39bc82bf39ce4b18 100644 (file)
@@ -76,7 +76,7 @@ class LDBTestCase(unittest.TestCase):
 
             my_check_output("./ldb %s >/dev/null 2>&1 |grep -v \"Created bg \
                 thread\"" % params, shell=True)
-        except Exception, e:
+        except Exception:
             return
         self.fail(
             "Exception should have been raised for command with params: %s" %
@@ -146,6 +146,14 @@ class LDBTestCase(unittest.TestCase):
     def loadDb(self, params, dumpFile):
         return 0 == run_err_null("cat %s | ./ldb load %s" % (dumpFile, params))
 
+    def writeExternSst(self, params, inputDumpFile, outputSst):
+        return 0 == run_err_null("cat %s | ./ldb write_extern_sst %s %s"
+                % (inputDumpFile, outputSst, params))
+
+    def ingestExternSst(self, params, inputSst):
+        return 0 == run_err_null("./ldb ingest_extern_sst %s %s"
+                                     % (inputSst, params))
+
     def testStringBatchPut(self):
         print "Running testStringBatchPut..."
         self.assertRunOK("batchput x1 y1 --create_if_missing", "OK")
@@ -547,5 +555,38 @@ class LDBTestCase(unittest.TestCase):
         # non-existing column family.
         self.assertRunFAIL("get cf3_1 --column_family=four")
 
+    def testIngestExternalSst(self):
+        print "Running testIngestExternalSst..."
+
+        # Dump, load, write external sst and ingest it in another db
+        dbPath = os.path.join(self.TMP_DIR, "db1")
+        self.assertRunOK(
+            "batchput --db=%s --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4"
+            % dbPath,
+            "OK")
+        self.assertRunOK("scan --db=%s" % dbPath,
+                         "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")
+        dumpFilePath = os.path.join(self.TMP_DIR, "dump1")
+        with open(dumpFilePath, 'w') as f:
+            f.write("x1 ==> y10\nx2 ==> y20\nx3 ==> y30\nx4 ==> y40")
+        externSstPath = os.path.join(self.TMP_DIR, "extern_data1.sst")
+        self.assertTrue(self.writeExternSst("--create_if_missing --db=%s"
+                            % dbPath,
+                        dumpFilePath,
+                        externSstPath))
+        # cannot ingest if allow_global_seqno is false
+        self.assertFalse(
+            self.ingestExternSst(
+                "--create_if_missing --allow_global_seqno=false --db=%s"
+                % dbPath,
+                externSstPath))
+        self.assertTrue(
+            self.ingestExternSst(
+                "--create_if_missing --allow_global_seqno --db=%s"
+                % dbPath,
+                externSstPath))
+        self.assertRunOKFull("scan --db=%s" % dbPath,
+                             "x1 : y10\nx2 : y20\nx3 : y30\nx4 : y40")
+
 if __name__ == "__main__":
     unittest.main()
index b09076ecc615b240ce74e79f082e720776a9c267..fe307eab7dce42f54fd15de99a8aac0a6e26a2dd 100644 (file)
@@ -88,6 +88,8 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
   BackupCommand::Help(ret);
   RestoreCommand::Help(ret);
   CheckPointCommand::Help(ret);
+  WriteExternalSstFilesCommand::Help(ret);
+  IngestExternalSstFilesCommand::Help(ret);
 
   fprintf(stderr, "%s\n", ret.c_str());
 }