]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add MultiGet to replay (#8577)
authorZhichao Cao <zhichao@fb.com>
Tue, 27 Jul 2021 20:55:15 +0000 (13:55 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Tue, 27 Jul 2021 20:56:15 +0000 (13:56 -0700)
Summary:
When the trace contains the MultiGet record, with this PR, it can replay the MultiGet.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8577

Test Plan: make check and replay the real trace.

Reviewed By: anand1976

Differential Revision: D29864060

Pulled By: zhichao-cao

fbshipit-source-id: 5288d4fc9b6a3cb331de1e0c635d4e044dcb534a

trace_replay/trace_replay.cc
trace_replay/trace_replay.h

index 5fd529568d0e6cb1056f6000690c888983540bc0..6171d91ec1d7b364514c8d55da58512263082b29 100644 (file)
@@ -607,6 +607,25 @@ Status Replayer::Replay() {
       single_iter->SeekForPrev(iter_payload.iter_key);
       ops++;
       delete single_iter;
+    } else if (trace.type == kTraceMultiGet) {
+      MultiGetPayload multiget_payload;
+      assert(trace_file_version_ >= 2);
+      TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
+      std::vector<ColumnFamilyHandle*> v_cfd;
+      std::vector<Slice> keys;
+      assert(multiget_payload.cf_ids.size() ==
+             multiget_payload.multiget_keys.size());
+      for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
+        assert(i < multiget_payload.cf_ids.size() &&
+               i < multiget_payload.multiget_keys.size());
+        if (cf_map_.find(multiget_payload.cf_ids[i]) == cf_map_.end()) {
+          return Status::Corruption("Invalid Column Family ID.");
+        }
+        v_cfd.push_back(cf_map_[multiget_payload.cf_ids[i]]);
+        keys.push_back(Slice(multiget_payload.multiget_keys[i]));
+      }
+      std::vector<std::string> values;
+      std::vector<Status> ss = db_->MultiGet(roptions, v_cfd, keys, &values);
     } else if (trace.type == kTraceEnd) {
       // Do nothing for now.
       // TODO: Add some validations later.
@@ -685,6 +704,10 @@ Status Replayer::MultiThreadReplay(uint32_t threads_num) {
       thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
                            nullptr, nullptr);
       ops++;
+    } else if (ra->trace_entry.type == kTraceMultiGet) {
+      thread_pool.Schedule(&Replayer::BGWorkMultiGet, ra.release(), nullptr,
+                           nullptr);
+      ops++;
     } else if (ra->trace_entry.type == kTraceEnd) {
       // Do nothing for now.
       // TODO: Add some validations later.
@@ -861,4 +884,32 @@ void Replayer::BGWorkIterSeekForPrev(void* arg) {
   return;
 }
 
+void Replayer::BGWorkMultiGet(void* arg) {
+  std::unique_ptr<ReplayerWorkerArg> ra(
+      reinterpret_cast<ReplayerWorkerArg*>(arg));
+  assert(ra != nullptr);
+  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
+      ra->cf_map);
+  MultiGetPayload multiget_payload;
+  if (ra->trace_file_version < 2) {
+    return;
+  }
+  TracerHelper::DecodeMultiGetPayload(&(ra->trace_entry), &multiget_payload);
+  std::vector<ColumnFamilyHandle*> v_cfd;
+  std::vector<Slice> keys;
+  if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
+    return;
+  }
+  for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
+    if (cf_map->find(multiget_payload.cf_ids[i]) == cf_map->end()) {
+      return;
+    }
+    v_cfd.push_back((*cf_map)[multiget_payload.cf_ids[i]]);
+    keys.push_back(Slice(multiget_payload.multiget_keys[i]));
+  }
+  std::vector<std::string> values;
+  std::vector<Status> ss = ra->db->MultiGet(ra->roptions, v_cfd, keys, &values);
+  return;
+}
+
 }  // namespace ROCKSDB_NAMESPACE
index d3ad2d799072aa6b23bb3de7fd5ea6dc6752e18e..d10bc1b4639527988ed176c8e8c4d38252910012 100644 (file)
@@ -268,6 +268,10 @@ class Replayer {
   // (SeekForPrev) based on the trace records.
   static void BGWorkIterSeekForPrev(void* arg);
 
+  // The background function for MultiThreadReplay to execute MultiGet based on
+  // the trace records
+  static void BGWorkMultiGet(void* arg);
+
   DBImpl* db_;
   Env* env_;
   std::unique_ptr<TraceReader> trace_reader_;