]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Refactor TraceAnalyzer to use `TraceRecord::Handler` to avoid casting. (#8678)
authorMerlin Mao <qzmao@fb.com>
Tue, 24 Aug 2021 00:17:13 +0000 (17:17 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Tue, 24 Aug 2021 00:18:27 +0000 (17:18 -0700)
Summary:
`TraceAnalyzer` privately inherits `TraceRecord::Handler` and `WriteBatch::Handler`.

`trace_analyzer_test` can pass with this change.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8678

Reviewed By: zhichao-cao

Differential Revision: D30459814

Pulled By: autopear

fbshipit-source-id: a27f59ac4600f7c3682830c9b1d9dc79e53425be

tools/trace_analyzer_tool.cc
tools/trace_analyzer_tool.h
trace_replay/trace_replay.cc
trace_replay/trace_replay.h
utilities/trace/replayer_impl.cc
utilities/trace/replayer_impl.h

index 973b3d6bdbc1061d93b4331cf67fe62ef2236a13..732c8889e15d0e89d3b68682a5bbebeacee18b66 100644 (file)
@@ -132,7 +132,7 @@ DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query.");
 DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
 DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
 DEFINE_bool(analyze_iterator, false,
-            " Analyze the iterate query like seek() and seekForPrev().");
+            " Analyze the iterate query like Seek() and SeekForPrev().");
 DEFINE_bool(analyze_multiget, false,
             " Analyze the MultiGet query. NOTE: for"
             " MultiGet, we analyze each KV-pair read in one MultiGet query. "
@@ -280,13 +280,14 @@ TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
   total_access_keys_ = 0;
   total_gets_ = 0;
   total_writes_ = 0;
+  total_seeks_ = 0;
+  total_seek_prevs_ = 0;
+  total_multigets_ = 0;
   trace_create_time_ = 0;
   begin_time_ = 0;
   end_time_ = 0;
   time_series_start_ = 0;
   cur_time_sec_ = 0;
-  // Set the default trace file version as version 0.2
-  trace_file_version_ = 2;
   if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
     sample_max_ = 1;
   } else {
@@ -360,7 +361,11 @@ TraceAnalyzer::~TraceAnalyzer() {}
 Status TraceAnalyzer::PrepareProcessing() {
   Status s;
   // Prepare the trace reader
-  s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
+  if (trace_reader_ == nullptr) {
+    s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
+  } else {
+    s = trace_reader_->Reset();
+  }
   if (!s.ok()) {
     return s;
   }
@@ -451,8 +456,9 @@ Status TraceAnalyzer::StartProcessing() {
     fprintf(stderr, "Cannot read the header\n");
     return s;
   }
-  s = TracerHelper::ParseTraceHeader(header, &trace_file_version_,
-                                     &db_version_);
+  // Set the default trace file version as version 0.2
+  int trace_file_version = 2;
+  s = TracerHelper::ParseTraceHeader(header, &trace_file_version, &db_version_);
   if (!s.ok()) {
     return s;
   }
@@ -469,96 +475,29 @@ Status TraceAnalyzer::StartProcessing() {
       break;
     }
 
-    total_requests_++;
     end_time_ = trace.ts;
     if (trace.type == kTraceEnd) {
       break;
     }
+    // Do not count TraceEnd (if there is one)
+    total_requests_++;
 
     std::unique_ptr<TraceRecord> record;
-    switch (trace.type) {
-      case kTraceWrite: {
-        s = TracerHelper::DecodeWriteRecord(&trace, trace_file_version_,
-                                            &record);
-        if (!s.ok()) {
-          return s;
-        }
-        total_writes_++;
-        c_time_ = trace.ts;
-        std::unique_ptr<WriteQueryTraceRecord> r(
-            reinterpret_cast<WriteQueryTraceRecord*>(record.release()));
-        // Note that, if the write happens in a transaction,
-        // 'Write' will be called twice, one for Prepare, one for
-        // Commit. Thus, in the trace, for the same WriteBatch, there
-        // will be two reords if it is in a transaction. Here, we only
-        // process the reord that is committed. If write is non-transaction,
-        // HasBeginPrepare()==false, so we process it normally.
-        WriteBatch batch(r->GetWriteBatchRep().ToString());
-        if (batch.HasBeginPrepare() && !batch.HasCommit()) {
-          continue;
-        }
-        TraceWriteHandler write_handler(this);
-        s = batch.Iterate(&write_handler);
-        if (!s.ok()) {
-          fprintf(stderr, "Cannot process the write batch in the trace\n");
-          return s;
-        }
-        break;
-      }
-      case kTraceGet: {
-        s = TracerHelper::DecodeGetRecord(&trace, trace_file_version_, &record);
-        if (!s.ok()) {
-          return s;
-        }
-        total_gets_++;
-        std::unique_ptr<GetQueryTraceRecord> r(
-            reinterpret_cast<GetQueryTraceRecord*>(record.release()));
-        s = HandleGet(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
-                      1);
-        if (!s.ok()) {
-          fprintf(stderr, "Cannot process the get in the trace\n");
-          return s;
-        }
-        break;
-      }
-      case kTraceIteratorSeek:
-      case kTraceIteratorSeekForPrev: {
-        s = TracerHelper::DecodeIterRecord(&trace, trace_file_version_,
-                                           &record);
-        if (!s.ok()) {
-          return s;
-        }
-        std::unique_ptr<IteratorSeekQueryTraceRecord> r(
-            reinterpret_cast<IteratorSeekQueryTraceRecord*>(record.release()));
-        s = HandleIter(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
-                       r->GetTraceType());
-        if (!s.ok()) {
-          fprintf(stderr, "Cannot process the iterator in the trace\n");
-          return s;
-        }
-        break;
-      }
-      case kTraceMultiGet: {
-        s = TracerHelper::DecodeMultiGetRecord(&trace, trace_file_version_,
-                                               &record);
-        if (!s.ok()) {
-          return s;
-        }
-        std::unique_ptr<MultiGetQueryTraceRecord> r(
-            reinterpret_cast<MultiGetQueryTraceRecord*>(record.release()));
-        s = HandleMultiGet(r->GetColumnFamilyIDs(), r->GetKeys(),
-                           r->GetTimestamp());
-        break;
-      }
-      default: {
-        // Skip unsupported types
-        break;
-      }
+    s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version, &record);
+    if (s.IsNotSupported()) {
+      continue;
+    }
+    if (!s.ok()) {
+      return s;
+    }
+    s = record->Accept(this, nullptr);
+    if (!s.ok()) {
+      fprintf(stderr, "Cannot process the TraceRecord\n");
+      return s;
     }
   }
   if (s.IsIncomplete()) {
     // Fix it: Reaching eof returns Incomplete status at the moment.
-    //
     return Status::OK();
   }
   return s;
@@ -1555,14 +1494,41 @@ Status TraceAnalyzer::CloseOutputFiles() {
   return s;
 }
 
-// Handle the Get request in the trace
-Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key,
-                                const uint64_t& ts, const uint32_t& get_ret) {
+Status TraceAnalyzer::Handle(const WriteQueryTraceRecord& record,
+                             std::unique_ptr<TraceRecordResult>* /*result*/) {
+  total_writes_++;
+  // Note that, if the write happens in a transaction,
+  // 'Write' will be called twice, one for Prepare, one for
+  // Commit. Thus, in the trace, for the same WriteBatch, there
+  // will be two records if it is in a transaction. Here, we only
+  // process the reord that is committed. If write is non-transaction,
+  // HasBeginPrepare()==false, so we process it normally.
+  WriteBatch batch(record.GetWriteBatchRep().ToString());
+  if (batch.HasBeginPrepare() && !batch.HasCommit()) {
+    return Status::OK();
+  }
+  c_time_ = record.GetTimestamp();
+  Status s = batch.Iterate(this);
+  if (!s.ok()) {
+    fprintf(stderr, "Cannot process the write batch in the trace\n");
+    return s;
+  }
+  return Status::OK();
+}
+
+Status TraceAnalyzer::Handle(const GetQueryTraceRecord& record,
+                             std::unique_ptr<TraceRecordResult>* /*result*/) {
+  total_gets_++;
+
+  uint32_t cf_id = record.GetColumnFamilyID();
+  Slice key = record.GetKey();
+  uint64_t ts = record.GetTimestamp();
+
   Status s;
   size_t value_size = 0;
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
-    s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key,
-                           value_size, ts);
+    s = WriteTraceSequence(TraceOperationType::kGet, cf_id, key, value_size,
+                           ts);
     if (!s.ok()) {
       return Status::Corruption("Failed to write the trace sequence to file");
     }
@@ -1580,11 +1546,109 @@ Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key,
   if (!ta_[TraceOperationType::kGet].enabled) {
     return Status::OK();
   }
-  if (get_ret == 1) {
-    value_size = 10;
+  value_size = 10;
+  s = KeyStatsInsertion(TraceOperationType::kGet, cf_id, key.ToString(),
+                        value_size, ts);
+  if (!s.ok()) {
+    return Status::Corruption("Failed to insert key statistics");
+  }
+  return s;
+}
+
+Status TraceAnalyzer::Handle(const IteratorSeekQueryTraceRecord& record,
+                             std::unique_ptr<TraceRecordResult>* /*result*/) {
+  uint32_t cf_id = record.GetColumnFamilyID();
+  Slice key = record.GetKey();
+  uint64_t ts = record.GetTimestamp();
+
+  // To do: add lower/upper bounds
+
+  Status s;
+  size_t value_size = 0;
+  int type = -1;
+  if (record.GetTraceType() == kTraceIteratorSeek) {
+    type = TraceOperationType::kIteratorSeek;
+    total_seeks_++;
+  } else if (record.GetTraceType() == kTraceIteratorSeekForPrev) {
+    type = TraceOperationType::kIteratorSeekForPrev;
+    total_seek_prevs_++;
+  } else {
+    return s;
+  }
+  if (type == -1) {
+    return s;
+  }
+
+  if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
+    s = WriteTraceSequence(type, cf_id, key, value_size, ts);
+    if (!s.ok()) {
+      return Status::Corruption("Failed to write the trace sequence to file");
+    }
+  }
+
+  if (ta_[type].sample_count >= sample_max_) {
+    ta_[type].sample_count = 0;
+  }
+  if (ta_[type].sample_count > 0) {
+    ta_[type].sample_count++;
+    return Status::OK();
+  }
+  ta_[type].sample_count++;
+
+  if (!ta_[type].enabled) {
+    return Status::OK();
+  }
+  s = KeyStatsInsertion(type, cf_id, key.ToString(), value_size, ts);
+  if (!s.ok()) {
+    return Status::Corruption("Failed to insert key statistics");
+  }
+  return s;
+}
+
+Status TraceAnalyzer::Handle(const MultiGetQueryTraceRecord& record,
+                             std::unique_ptr<TraceRecordResult>* /*result*/) {
+  total_multigets_++;
+
+  std::vector<uint32_t> cf_ids = record.GetColumnFamilyIDs();
+  std::vector<Slice> keys = record.GetKeys();
+  uint64_t ts = record.GetTimestamp();
+
+  Status s;
+  size_t value_size = 0;
+  if (cf_ids.size() != keys.size()) {
+    // The size does not match is not the error of tracing and anayzing, we just
+    // report it to the user. The analyzing continues.
+    printf("The CF ID vector size does not match the keys vector size!\n");
+  }
+  size_t vector_size = std::min(cf_ids.size(), keys.size());
+  if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
+    for (size_t i = 0; i < vector_size; i++) {
+      assert(i < cf_ids.size() && i < keys.size());
+      s = WriteTraceSequence(TraceOperationType::kMultiGet, cf_ids[i], keys[i],
+                             value_size, ts);
+    }
+    if (!s.ok()) {
+      return Status::Corruption("Failed to write the trace sequence to file");
+    }
+  }
+
+  if (ta_[TraceOperationType::kMultiGet].sample_count >= sample_max_) {
+    ta_[TraceOperationType::kMultiGet].sample_count = 0;
+  }
+  if (ta_[TraceOperationType::kMultiGet].sample_count > 0) {
+    ta_[TraceOperationType::kMultiGet].sample_count++;
+    return Status::OK();
+  }
+  ta_[TraceOperationType::kMultiGet].sample_count++;
+
+  if (!ta_[TraceOperationType::kMultiGet].enabled) {
+    return Status::OK();
+  }
+  for (size_t i = 0; i < vector_size; i++) {
+    assert(i < cf_ids.size() && i < keys.size());
+    s = KeyStatsInsertion(TraceOperationType::kMultiGet, cf_ids[i],
+                          keys[i].ToString(), value_size, ts);
   }
-  s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id,
-                        key.ToString(), value_size, ts);
   if (!s.ok()) {
     return Status::Corruption("Failed to insert key statistics");
   }
@@ -1592,8 +1656,8 @@ Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key,
 }
 
 // Handle the Put request in the write batch of the trace
-Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
-                                const Slice& value) {
+Status TraceAnalyzer::PutCF(uint32_t column_family_id, const Slice& key,
+                            const Slice& value) {
   Status s;
   size_t value_size = value.ToString().size();
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
@@ -1625,8 +1689,7 @@ Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
 }
 
 // Handle the Delete request in the write batch of the trace
-Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
-                                   const Slice& key) {
+Status TraceAnalyzer::DeleteCF(uint32_t column_family_id, const Slice& key) {
   Status s;
   size_t value_size = 0;
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
@@ -1658,8 +1721,8 @@ Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
 }
 
 // Handle the SingleDelete request in the write batch of the trace
-Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
-                                         const Slice& key) {
+Status TraceAnalyzer::SingleDeleteCF(uint32_t column_family_id,
+                                     const Slice& key) {
   Status s;
   size_t value_size = 0;
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
@@ -1691,9 +1754,9 @@ Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
 }
 
 // Handle the DeleteRange request in the write batch of the trace
-Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
-                                        const Slice& begin_key,
-                                        const Slice& end_key) {
+Status TraceAnalyzer::DeleteRangeCF(uint32_t column_family_id,
+                                    const Slice& begin_key,
+                                    const Slice& end_key) {
   Status s;
   size_t value_size = 0;
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
@@ -1727,8 +1790,8 @@ Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
 }
 
 // Handle the Merge request in the write batch of the trace
-Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
-                                  const Slice& value) {
+Status TraceAnalyzer::MergeCF(uint32_t column_family_id, const Slice& key,
+                              const Slice& value) {
   Status s;
   size_t value_size = value.ToString().size();
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
@@ -1759,95 +1822,6 @@ Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
   return s;
 }
 
-// Handle the Iterator request in the trace
-Status TraceAnalyzer::HandleIter(uint32_t column_family_id, const Slice& key,
-                                 const uint64_t& ts, TraceType trace_type) {
-  Status s;
-  size_t value_size = 0;
-  int type = -1;
-  if (trace_type == kTraceIteratorSeek) {
-    type = TraceOperationType::kIteratorSeek;
-  } else if (trace_type == kTraceIteratorSeekForPrev) {
-    type = TraceOperationType::kIteratorSeekForPrev;
-  } else {
-    return s;
-  }
-  if (type == -1) {
-    return s;
-  }
-
-  if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
-    s = WriteTraceSequence(type, column_family_id, key, value_size, ts);
-    if (!s.ok()) {
-      return Status::Corruption("Failed to write the trace sequence to file");
-    }
-  }
-
-  if (ta_[type].sample_count >= sample_max_) {
-    ta_[type].sample_count = 0;
-  }
-  if (ta_[type].sample_count > 0) {
-    ta_[type].sample_count++;
-    return Status::OK();
-  }
-  ta_[type].sample_count++;
-
-  if (!ta_[type].enabled) {
-    return Status::OK();
-  }
-  s = KeyStatsInsertion(type, column_family_id, key.ToString(), value_size, ts);
-  if (!s.ok()) {
-    return Status::Corruption("Failed to insert key statistics");
-  }
-  return s;
-}
-
-// Handle MultiGet queries in the trace
-Status TraceAnalyzer::HandleMultiGet(
-    const std::vector<uint32_t>& column_family_ids,
-    const std::vector<Slice>& keys, const uint64_t& ts) {
-  Status s;
-  size_t value_size = 0;
-  if (column_family_ids.size() != keys.size()) {
-    // The size does not match is not the error of tracing and anayzing, we just
-    // report it to the user. The analyzing continues.
-    printf("The CF ID vector size does not match the keys vector size!\n");
-  }
-  size_t vector_size = std::min(column_family_ids.size(), keys.size());
-  if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
-    for (size_t i = 0; i < vector_size; i++) {
-      assert(i < column_family_ids.size() && i < keys.size());
-      s = WriteTraceSequence(TraceOperationType::kMultiGet,
-                             column_family_ids[i], keys[i], value_size, ts);
-    }
-    if (!s.ok()) {
-      return Status::Corruption("Failed to write the trace sequence to file");
-    }
-  }
-
-  if (ta_[TraceOperationType::kMultiGet].sample_count >= sample_max_) {
-    ta_[TraceOperationType::kMultiGet].sample_count = 0;
-  }
-  if (ta_[TraceOperationType::kMultiGet].sample_count > 0) {
-    ta_[TraceOperationType::kMultiGet].sample_count++;
-    return Status::OK();
-  }
-  ta_[TraceOperationType::kMultiGet].sample_count++;
-
-  if (!ta_[TraceOperationType::kMultiGet].enabled) {
-    return Status::OK();
-  }
-  for (size_t i = 0; i < vector_size; i++) {
-    assert(i < column_family_ids.size() && i < keys.size());
-    s = KeyStatsInsertion(TraceOperationType::kMultiGet, column_family_ids[i],
-                          keys[i].ToString(), value_size, ts);
-  }
-  if (!s.ok()) {
-    return Status::Corruption("Failed to insert key statistics");
-  }
-  return s;
-}
-
 // Before the analyzer is closed, the requested general statistic results are
 // printed out here. In current stage, these information are not output to
 // the files.
@@ -1999,8 +1973,11 @@ void TraceAnalyzer::PrintStatistics() {
     printf("The statistics related to query number need to times: %u\n",
            sample_max_);
     printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
-           " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n",
-           total_requests_, total_access_keys_, total_gets_, total_writes_);
+           " Total_gets: %" PRIu64 " Total_write_batches: %" PRIu64
+           " Total_seeks: %" PRIu64 " Total_seek_for_prevs: %" PRIu64
+           " Total_multigets: %" PRIu64 "\n",
+           total_requests_, total_access_keys_, total_gets_, total_writes_,
+           total_seeks_, total_seek_prevs_, total_multigets_);
     for (int type = 0; type < kTaTypeNum; type++) {
       if (!ta_[type].enabled) {
         continue;
index 7eafd2a3c8dddfc1d4910d381e9f6c90fd9e48a7..14f44ff9cc9778369a16932c41b2a779b090258f 100644 (file)
@@ -164,7 +164,8 @@ struct CfUnit {
   std::map<uint32_t, uint32_t> cf_qps;
 };
 
-class TraceAnalyzer {
+class TraceAnalyzer : private TraceRecord::Handler,
+                      private WriteBatch::Handler {
  public:
   TraceAnalyzer(std::string& trace_path, std::string& output_path,
                 AnalyzerOptions _analyzer_opts);
@@ -182,24 +183,64 @@ class TraceAnalyzer {
 
   Status WriteTraceUnit(TraceUnit& unit);
 
-  // The trace  processing functions for different type
-  Status HandleGet(uint32_t column_family_id, const Slice& key,
-                   const uint64_t& ts, const uint32_t& get_ret);
-  Status HandlePut(uint32_t column_family_id, const Slice& key,
-                   const Slice& value);
-  Status HandleDelete(uint32_t column_family_id, const Slice& key);
-  Status HandleSingleDelete(uint32_t column_family_id, const Slice& key);
-  Status HandleDeleteRange(uint32_t column_family_id, const Slice& begin_key,
-                           const Slice& end_key);
-  Status HandleMerge(uint32_t column_family_id, const Slice& key,
-                     const Slice& value);
-  Status HandleIter(uint32_t column_family_id, const Slice& key,
-                    const uint64_t& ts, TraceType trace_type);
-  Status HandleMultiGet(const std::vector<uint32_t>& column_family_ids,
-                        const std::vector<Slice>& keys, const uint64_t& ts);
   std::vector<TypeUnit>& GetTaVector() { return ta_; }
 
  private:
+  using TraceRecord::Handler::Handle;
+  Status Handle(const WriteQueryTraceRecord& record,
+                std::unique_ptr<TraceRecordResult>* result) override;
+  Status Handle(const GetQueryTraceRecord& record,
+                std::unique_ptr<TraceRecordResult>* result) override;
+  Status Handle(const IteratorSeekQueryTraceRecord& record,
+                std::unique_ptr<TraceRecordResult>* result) override;
+  Status Handle(const MultiGetQueryTraceRecord& record,
+                std::unique_ptr<TraceRecordResult>* result) override;
+
+  using WriteBatch::Handler::PutCF;
+  Status PutCF(uint32_t column_family_id, const Slice& key,
+               const Slice& value) override;
+
+  using WriteBatch::Handler::DeleteCF;
+  Status DeleteCF(uint32_t column_family_id, const Slice& key) override;
+
+  using WriteBatch::Handler::SingleDeleteCF;
+  Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override;
+
+  using WriteBatch::Handler::DeleteRangeCF;
+  Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
+                       const Slice& end_key) override;
+
+  using WriteBatch::Handler::MergeCF;
+  Status MergeCF(uint32_t column_family_id, const Slice& key,
+                 const Slice& value) override;
+
+  // The following hanlders are not implemented, return Status::OK() to avoid
+  // the running time assertion and other irrelevant falures.
+  using WriteBatch::Handler::PutBlobIndexCF;
+  Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
+                        const Slice& /*value*/) override {
+    return Status::OK();
+  }
+
+  // The default implementation of LogData does nothing.
+  using WriteBatch::Handler::LogData;
+  void LogData(const Slice& /*blob*/) override {}
+
+  using WriteBatch::Handler::MarkBeginPrepare;
+  Status MarkBeginPrepare(bool = false) override { return Status::OK(); }
+
+  using WriteBatch::Handler::MarkEndPrepare;
+  Status MarkEndPrepare(const Slice& /*xid*/) override { return Status::OK(); }
+
+  using WriteBatch::Handler::MarkNoop;
+  Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
+
+  using WriteBatch::Handler::MarkRollback;
+  Status MarkRollback(const Slice& /*xid*/) override { return Status::OK(); }
+
+  using WriteBatch::Handler::MarkCommit;
+  Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); }
+
   ROCKSDB_NAMESPACE::Env* env_;
   EnvOptions env_options_;
   std::unique_ptr<TraceReader> trace_reader_;
@@ -213,6 +254,9 @@ class TraceAnalyzer {
   uint64_t total_access_keys_;
   uint64_t total_gets_;
   uint64_t total_writes_;
+  uint64_t total_seeks_;
+  uint64_t total_seek_prevs_;
+  uint64_t total_multigets_;
   uint64_t trace_create_time_;
   uint64_t begin_time_;
   uint64_t end_time_;
@@ -253,76 +297,9 @@ class TraceAnalyzer {
   Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
   Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
   Status MakeStatisticQPS();
-  // Set the default trace file version as version 0.2
-  int trace_file_version_;
   int db_version_;
 };
 
-// write bach handler to be used for WriteBache iterator
-// when processing the write trace
-class TraceWriteHandler : public WriteBatch::Handler {
- public:
-  TraceWriteHandler() { ta_ptr = nullptr; }
-  explicit TraceWriteHandler(TraceAnalyzer* _ta_ptr) { ta_ptr = _ta_ptr; }
-  ~TraceWriteHandler() {}
-
-  virtual Status PutCF(uint32_t column_family_id, const Slice& key,
-                       const Slice& value) override {
-    return ta_ptr->HandlePut(column_family_id, key, value);
-  }
-  virtual Status DeleteCF(uint32_t column_family_id,
-                          const Slice& key) override {
-    return ta_ptr->HandleDelete(column_family_id, key);
-  }
-  virtual Status SingleDeleteCF(uint32_t column_family_id,
-                                const Slice& key) override {
-    return ta_ptr->HandleSingleDelete(column_family_id, key);
-  }
-  virtual Status DeleteRangeCF(uint32_t column_family_id,
-                               const Slice& begin_key,
-                               const Slice& end_key) override {
-    return ta_ptr->HandleDeleteRange(column_family_id, begin_key, end_key);
-  }
-  virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
-                         const Slice& value) override {
-    return ta_ptr->HandleMerge(column_family_id, key, value);
-  }
-
-  // The following hanlders are not implemented, return Status::OK() to avoid
-  // the running time assertion and other irrelevant falures.
-  virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/,
-                                const Slice& /*key*/,
-                                const Slice& /*value*/) override {
-    return Status::OK();
-  }
-
-  // The default implementation of LogData does nothing.
-  virtual void LogData(const Slice& /*blob*/) override {}
-
-  virtual Status MarkBeginPrepare(bool = false) override {
-    return Status::OK();
-  }
-
-  virtual Status MarkEndPrepare(const Slice& /*xid*/) override {
-    return Status::OK();
-  }
-
-  virtual Status MarkNoop(bool /*empty_batch*/) override {
-    return Status::OK();
-  }
-
-  virtual Status MarkRollback(const Slice& /*xid*/) override {
-    return Status::OK();
-  }
-
-  virtual Status MarkCommit(const Slice& /*xid*/) override {
-    return Status::OK();
-  }
-
- private:
-  TraceAnalyzer* ta_ptr;
-};
-
 int trace_analyzer_tool(int argc, char** argv);
 
 }  // namespace ROCKSDB_NAMESPACE
index 01867b9f4dfab8450446a5ee8647ce8c593f6595..89bea7870e073f7ae3dddc78d09b1d04c4154ca7 100644 (file)
@@ -126,222 +126,218 @@ bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
   return old_state != payload_map;
 }
 
-Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version,
+Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
                                        std::unique_ptr<TraceRecord>* record) {
   assert(trace != nullptr);
-  assert(trace->type == kTraceWrite);
 
   if (record != nullptr) {
     record->reset(nullptr);
   }
 
-  PinnableSlice rep;
-  if (trace_file_version < 2) {
-    rep.PinSelf(trace->payload);
-  } else {
-    Slice buf(trace->payload);
-    GetFixed64(&buf, &trace->payload_map);
-    int64_t payload_map = static_cast<int64_t>(trace->payload_map);
-    Slice write_batch_data;
-    while (payload_map) {
-      // Find the rightmost set bit.
-      uint32_t set_pos =
-          static_cast<uint32_t>(log2(payload_map & -payload_map));
-      switch (set_pos) {
-        case TracePayloadType::kWriteBatchData:
-          GetLengthPrefixedSlice(&buf, &write_batch_data);
-          break;
-        default:
-          assert(false);
+  switch (trace->type) {
+    // Write
+    case kTraceWrite: {
+      PinnableSlice rep;
+      if (trace_file_version < 2) {
+        rep.PinSelf(trace->payload);
+      } else {
+        Slice buf(trace->payload);
+        GetFixed64(&buf, &trace->payload_map);
+        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+        Slice write_batch_data;
+        while (payload_map) {
+          // Find the rightmost set bit.
+          uint32_t set_pos =
+              static_cast<uint32_t>(log2(payload_map & -payload_map));
+          switch (set_pos) {
+            case TracePayloadType::kWriteBatchData: {
+              GetLengthPrefixedSlice(&buf, &write_batch_data);
+              break;
+            }
+            default: {
+              assert(false);
+            }
+          }
+          // unset the rightmost bit.
+          payload_map &= (payload_map - 1);
+        }
+        rep.PinSelf(write_batch_data);
       }
-      // unset the rightmost bit.
-      payload_map &= (payload_map - 1);
-    }
-    rep.PinSelf(write_batch_data);
-  }
-
-  if (record != nullptr) {
-    record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
-  }
-
-  return Status::OK();
-}
-
-Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version,
-                                     std::unique_ptr<TraceRecord>* record) {
-  assert(trace != nullptr);
-  assert(trace->type == kTraceGet);
-
-  if (record != nullptr) {
-    record->reset(nullptr);
-  }
 
-  uint32_t cf_id = 0;
-  Slice get_key;
-
-  if (trace_file_version < 2) {
-    DecodeCFAndKey(trace->payload, &cf_id, &get_key);
-  } else {
-    Slice buf(trace->payload);
-    GetFixed64(&buf, &trace->payload_map);
-    int64_t payload_map = static_cast<int64_t>(trace->payload_map);
-    while (payload_map) {
-      // Find the rightmost set bit.
-      uint32_t set_pos =
-          static_cast<uint32_t>(log2(payload_map & -payload_map));
-      switch (set_pos) {
-        case TracePayloadType::kGetCFID:
-          GetFixed32(&buf, &cf_id);
-          break;
-        case TracePayloadType::kGetKey:
-          GetLengthPrefixedSlice(&buf, &get_key);
-          break;
-        default:
-          assert(false);
+      if (record != nullptr) {
+        record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
       }
-      // unset the rightmost bit.
-      payload_map &= (payload_map - 1);
-    }
-  }
 
-  if (record != nullptr) {
-    PinnableSlice ps;
-    ps.PinSelf(get_key);
-    record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
-  }
-
-  return Status::OK();
-}
+      return Status::OK();
+    }
+    // Get
+    case kTraceGet: {
+      uint32_t cf_id = 0;
+      Slice get_key;
+
+      if (trace_file_version < 2) {
+        DecodeCFAndKey(trace->payload, &cf_id, &get_key);
+      } else {
+        Slice buf(trace->payload);
+        GetFixed64(&buf, &trace->payload_map);
+        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+        while (payload_map) {
+          // Find the rightmost set bit.
+          uint32_t set_pos =
+              static_cast<uint32_t>(log2(payload_map & -payload_map));
+          switch (set_pos) {
+            case TracePayloadType::kGetCFID: {
+              GetFixed32(&buf, &cf_id);
+              break;
+            }
+            case TracePayloadType::kGetKey: {
+              GetLengthPrefixedSlice(&buf, &get_key);
+              break;
+            }
+            default: {
+              assert(false);
+            }
+          }
+          // unset the rightmost bit.
+          payload_map &= (payload_map - 1);
+        }
+      }
 
-Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
-                                      std::unique_ptr<TraceRecord>* record) {
-  assert(trace != nullptr);
-  assert(trace->type == kTraceIteratorSeek ||
-         trace->type == kTraceIteratorSeekForPrev);
+      if (record != nullptr) {
+        PinnableSlice ps;
+        ps.PinSelf(get_key);
+        record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
+      }
 
-  if (record != nullptr) {
-    record->reset(nullptr);
-  }
+      return Status::OK();
+    }
+    // Iterator Seek and SeekForPrev
+    case kTraceIteratorSeek:
+    case kTraceIteratorSeekForPrev: {
+      uint32_t cf_id = 0;
+      Slice iter_key;
+      Slice lower_bound;
+      Slice upper_bound;
+
+      if (trace_file_version < 2) {
+        DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
+      } else {
+        Slice buf(trace->payload);
+        GetFixed64(&buf, &trace->payload_map);
+        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+        while (payload_map) {
+          // Find the rightmost set bit.
+          uint32_t set_pos =
+              static_cast<uint32_t>(log2(payload_map & -payload_map));
+          switch (set_pos) {
+            case TracePayloadType::kIterCFID: {
+              GetFixed32(&buf, &cf_id);
+              break;
+            }
+            case TracePayloadType::kIterKey: {
+              GetLengthPrefixedSlice(&buf, &iter_key);
+              break;
+            }
+            case TracePayloadType::kIterLowerBound: {
+              GetLengthPrefixedSlice(&buf, &lower_bound);
+              break;
+            }
+            case TracePayloadType::kIterUpperBound: {
+              GetLengthPrefixedSlice(&buf, &upper_bound);
+              break;
+            }
+            default: {
+              assert(false);
+            }
+          }
+          // unset the rightmost bit.
+          payload_map &= (payload_map - 1);
+        }
+      }
 
-  uint32_t cf_id = 0;
-  Slice iter_key;
-  Slice lower_bound;
-  Slice upper_bound;
-
-  if (trace_file_version < 2) {
-    DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
-  } else {
-    Slice buf(trace->payload);
-    GetFixed64(&buf, &trace->payload_map);
-    int64_t payload_map = static_cast<int64_t>(trace->payload_map);
-    while (payload_map) {
-      // Find the rightmost set bit.
-      uint32_t set_pos =
-          static_cast<uint32_t>(log2(payload_map & -payload_map));
-      switch (set_pos) {
-        case TracePayloadType::kIterCFID:
-          GetFixed32(&buf, &cf_id);
-          break;
-        case TracePayloadType::kIterKey:
-          GetLengthPrefixedSlice(&buf, &iter_key);
-          break;
-        case TracePayloadType::kIterLowerBound:
-          GetLengthPrefixedSlice(&buf, &lower_bound);
-          break;
-        case TracePayloadType::kIterUpperBound:
-          GetLengthPrefixedSlice(&buf, &upper_bound);
-          break;
-        default:
-          assert(false);
+      if (record != nullptr) {
+        PinnableSlice ps_key;
+        ps_key.PinSelf(iter_key);
+        PinnableSlice ps_lower;
+        ps_lower.PinSelf(lower_bound);
+        PinnableSlice ps_upper;
+        ps_upper.PinSelf(upper_bound);
+        record->reset(new IteratorSeekQueryTraceRecord(
+            static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
+            cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
+            trace->ts));
       }
-      // unset the rightmost bit.
-      payload_map &= (payload_map - 1);
-    }
-  }
 
-  if (record != nullptr) {
-    PinnableSlice ps_key;
-    ps_key.PinSelf(iter_key);
-    PinnableSlice ps_lower;
-    ps_lower.PinSelf(lower_bound);
-    PinnableSlice ps_upper;
-    ps_upper.PinSelf(upper_bound);
-    record->reset(new IteratorSeekQueryTraceRecord(
-        static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), cf_id,
-        std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
-        trace->ts));
-  }
+      return Status::OK();
+    }
+    // MultiGet
+    case kTraceMultiGet: {
+      if (trace_file_version < 2) {
+        return Status::Corruption("MultiGet is not supported.");
+      }
 
-  return Status::OK();
-}
+      uint32_t multiget_size = 0;
+      std::vector<uint32_t> cf_ids;
+      std::vector<PinnableSlice> multiget_keys;
+
+      Slice cfids_payload;
+      Slice keys_payload;
+      Slice buf(trace->payload);
+      GetFixed64(&buf, &trace->payload_map);
+      int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+      while (payload_map) {
+        // Find the rightmost set bit.
+        uint32_t set_pos =
+            static_cast<uint32_t>(log2(payload_map & -payload_map));
+        switch (set_pos) {
+          case TracePayloadType::kMultiGetSize: {
+            GetFixed32(&buf, &multiget_size);
+            break;
+          }
+          case TracePayloadType::kMultiGetCFIDs: {
+            GetLengthPrefixedSlice(&buf, &cfids_payload);
+            break;
+          }
+          case TracePayloadType::kMultiGetKeys: {
+            GetLengthPrefixedSlice(&buf, &keys_payload);
+            break;
+          }
+          default: {
+            assert(false);
+          }
+        }
+        // unset the rightmost bit.
+        payload_map &= (payload_map - 1);
+      }
+      if (multiget_size == 0) {
+        return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+      }
 
-Status TracerHelper::DecodeMultiGetRecord(
-    Trace* trace, int trace_file_version,
-    std::unique_ptr<TraceRecord>* record) {
-  assert(trace != nullptr);
-  assert(trace->type == kTraceMultiGet);
+      // Decode the cfids_payload and keys_payload
+      cf_ids.reserve(multiget_size);
+      multiget_keys.reserve(multiget_size);
+      for (uint32_t i = 0; i < multiget_size; i++) {
+        uint32_t tmp_cfid;
+        Slice tmp_key;
+        GetFixed32(&cfids_payload, &tmp_cfid);
+        GetLengthPrefixedSlice(&keys_payload, &tmp_key);
+        cf_ids.push_back(tmp_cfid);
+        Slice s(tmp_key);
+        PinnableSlice ps;
+        ps.PinSelf(s);
+        multiget_keys.push_back(std::move(ps));
+      }
 
-  if (record != nullptr) {
-    record->reset(nullptr);
-  }
+      if (record != nullptr) {
+        record->reset(new MultiGetQueryTraceRecord(
+            std::move(cf_ids), std::move(multiget_keys), trace->ts));
+      }
 
-  if (trace_file_version < 2) {
-    return Status::Corruption("MultiGet is not supported.");
-  }
-
-  uint32_t multiget_size = 0;
-  std::vector<uint32_t> cf_ids;
-  std::vector<PinnableSlice> multiget_keys;
-
-  Slice cfids_payload;
-  Slice keys_payload;
-  Slice buf(trace->payload);
-  GetFixed64(&buf, &trace->payload_map);
-  int64_t payload_map = static_cast<int64_t>(trace->payload_map);
-  while (payload_map) {
-    // Find the rightmost set bit.
-    uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
-    switch (set_pos) {
-      case TracePayloadType::kMultiGetSize:
-        GetFixed32(&buf, &multiget_size);
-        break;
-      case TracePayloadType::kMultiGetCFIDs:
-        GetLengthPrefixedSlice(&buf, &cfids_payload);
-        break;
-      case TracePayloadType::kMultiGetKeys:
-        GetLengthPrefixedSlice(&buf, &keys_payload);
-        break;
-      default:
-        assert(false);
+      return Status::OK();
     }
-    // unset the rightmost bit.
-    payload_map &= (payload_map - 1);
+    default:
+      return Status::NotSupported("Unsupported trace type.");
   }
-  if (multiget_size == 0) {
-    return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
-  }
-
-  // Decode the cfids_payload and keys_payload
-  cf_ids.reserve(multiget_size);
-  multiget_keys.reserve(multiget_size);
-  for (uint32_t i = 0; i < multiget_size; i++) {
-    uint32_t tmp_cfid;
-    Slice tmp_key;
-    GetFixed32(&cfids_payload, &tmp_cfid);
-    GetLengthPrefixedSlice(&keys_payload, &tmp_key);
-    cf_ids.push_back(tmp_cfid);
-    Slice s(tmp_key);
-    PinnableSlice ps;
-    ps.PinSelf(s);
-    multiget_keys.push_back(std::move(ps));
-  }
-
-  if (record != nullptr) {
-    record->reset(new MultiGetQueryTraceRecord(
-        std::move(cf_ids), std::move(multiget_keys), trace->ts));
-  }
-
-  return Status::OK();
 }
 
 Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
index 979eb349278b9f634f25f29bb7a899882c461c70..4990accef8b9b5691dd2bb45024797ab53de4a70 100644 (file)
@@ -106,21 +106,12 @@ class TracerHelper {
   static bool SetPayloadMap(uint64_t& payload_map,
                             const TracePayloadType payload_type);
 
-  // Decode the write payload and store in WrteiPayload
-  static Status DecodeWriteRecord(Trace* trace, int trace_file_version,
+  // Decode a Trace object into the corresponding TraceRecord.
+  // Return Status::OK() if nothing is wrong, record will be set accordingly.
+  // Return Status::NotSupported() if the trace type is not support, or the
+  // corresponding error status, record will be set to nullptr.
+  static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
                                   std::unique_ptr<TraceRecord>* record);
-
-  // Decode the get payload and store in WrteiPayload
-  static Status DecodeGetRecord(Trace* trace, int trace_file_version,
-                                std::unique_ptr<TraceRecord>* record);
-
-  // Decode the iter payload and store in WrteiPayload
-  static Status DecodeIterRecord(Trace* trace, int trace_file_version,
-                                 std::unique_ptr<TraceRecord>* record);
-
-  // Decode the multiget payload and store in MultiGetPayload
-  static Status DecodeMultiGetRecord(Trace* trace, int trace_file_version,
-                                     std::unique_ptr<TraceRecord>* record);
 };
 
 // Tracer captures all RocksDB operations using a user-provided TraceWriter.
index c98155d5345f719f43805fff2530db7ad4115a11..09d94441eef7e7f369af66ebf91126aebadf4d68 100644 (file)
@@ -70,7 +70,7 @@ Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
     return s;
   }
 
-  return DecodeTraceRecord(&trace, trace_file_version_, record);
+  return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record);
 }
 
 Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
@@ -117,7 +117,7 @@ Status ReplayerImpl::Replay(
 
       // In single-threaded replay, decode first then sleep.
       std::unique_ptr<TraceRecord> record;
-      s = DecodeTraceRecord(&trace, trace_file_version_, &record);
+      s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record);
       if (!s.ok() && !s.IsNotSupported()) {
         break;
       }
@@ -283,34 +283,14 @@ Status ReplayerImpl::ReadTrace(Trace* trace) {
   return TracerHelper::DecodeTrace(encoded_trace, trace);
 }
 
-Status ReplayerImpl::DecodeTraceRecord(Trace* trace, int trace_file_version,
-                                       std::unique_ptr<TraceRecord>* record) {
-  switch (trace->type) {
-    case kTraceWrite:
-      return TracerHelper::DecodeWriteRecord(trace, trace_file_version, record);
-    case kTraceGet:
-      return TracerHelper::DecodeGetRecord(trace, trace_file_version, record);
-    case kTraceIteratorSeek:
-    case kTraceIteratorSeekForPrev:
-      return TracerHelper::DecodeIterRecord(trace, trace_file_version, record);
-    case kTraceMultiGet:
-      return TracerHelper::DecodeMultiGetRecord(trace, trace_file_version,
-                                                record);
-    case kTraceEnd:
-      return Status::Incomplete("Trace end.");
-    default:
-      return Status::NotSupported("Unsupported trace type.");
-  }
-}
-
 void ReplayerImpl::BackgroundWork(void* arg) {
   std::unique_ptr<ReplayerWorkerArg> ra(
       reinterpret_cast<ReplayerWorkerArg*>(arg));
   assert(ra != nullptr);
 
   std::unique_ptr<TraceRecord> record;
-  Status s =
-      DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
+  Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry),
+                                             ra->trace_file_version, &record);
   if (!s.ok()) {
     // Stop the replay
     if (ra->error_cb != nullptr) {
index 9cf18296069ccf5313461fb5d4334a9ec58b4970..3a61b2ecb345ccb432d381f6c653bda4e7ae62f2 100644 (file)
@@ -53,10 +53,6 @@ class ReplayerImpl : public Replayer {
   Status ReadFooter(Trace* footer);
   Status ReadTrace(Trace* trace);
 
-  // Generic function to convert a Trace to TraceRecord.
-  static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
-                                  std::unique_ptr<TraceRecord>* record);
-
   // Generic function to execute a Trace in a thread pool.
   static void BackgroundWork(void* arg);