]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Allow Replayer to report the results of TraceRecords. (#8657)
authorMerlin Mao <qzmao@fb.com>
Thu, 19 Aug 2021 00:04:36 +0000 (17:04 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 19 Aug 2021 00:06:14 +0000 (17:06 -0700)
Summary:
`Replayer::Execute()` can directly returns the result (e.g, request latency, DB::Get() return code, returned value, etc.)
`Replayer::Replay()` reports the results via a callback function.

New interface:
`TraceRecordResult` in "rocksdb/trace_record_result.h".

`DBTest2.TraceAndReplay` and `DBTest2.TraceAndManualReplay` are updated accordingly.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8657

Reviewed By: ajkr

Differential Revision: D30290216

Pulled By: autopear

fbshipit-source-id: 3c8d4e6b180ec743de1a9d9dcaee86064c74f0d6

16 files changed:
CMakeLists.txt
HISTORY.md
TARGETS
db/db_test2.cc
include/rocksdb/trace_record.h
include/rocksdb/trace_record_result.h [new file with mode: 0644]
include/rocksdb/utilities/replayer.h
src.mk
tools/db_bench_tool.cc
trace_replay/trace_record.cc
trace_replay/trace_record_handler.cc
trace_replay/trace_record_handler.h
trace_replay/trace_record_result.cc [new file with mode: 0644]
trace_replay/trace_replay.cc
utilities/trace/replayer_impl.cc
utilities/trace/replayer_impl.h

index a14d2dde7f663b5b771b729b2306e2a33db4e480..1d78743d956661789f73260aa8da9a0cdc41af24 100644 (file)
@@ -819,6 +819,7 @@ set(SOURCES
         trace_replay/block_cache_tracer.cc
         trace_replay/io_tracer.cc
         trace_replay/trace_record_handler.cc
+        trace_replay/trace_record_result.cc
         trace_replay/trace_record.cc
         trace_replay/trace_replay.cc
         util/coding.cc
index 4de77d6f7aa9be78f9fd0c1ec879079dde837610..48c9f1e0154ca5c61f1df7e910efafa5cca19d48 100644 (file)
@@ -22,7 +22,7 @@
 * The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`.
 
 ## Public API change
-* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them.
+* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h, trace_record_result.h and utilities/replayer.h files to access the decoded Trace records, replay them, and query the actual operation results.
 
 ### Performance Improvements
 * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.
diff --git a/TARGETS b/TARGETS
index 0f50b4fa85b5b858bc705a3750a14faf6e45e62e..b5b992b6b6d1ba3f1409c8f1b8cc7d95ec9b3be8 100644 (file)
--- a/TARGETS
+++ b/TARGETS
@@ -337,6 +337,7 @@ cpp_library(
         "trace_replay/io_tracer.cc",
         "trace_replay/trace_record.cc",
         "trace_replay/trace_record_handler.cc",
+        "trace_replay/trace_record_result.cc",
         "trace_replay/trace_replay.cc",
         "util/build_version.cc",
         "util/coding.cc",
@@ -655,6 +656,7 @@ cpp_library(
         "trace_replay/io_tracer.cc",
         "trace_replay/trace_record.cc",
         "trace_replay/trace_record_handler.cc",
+        "trace_replay/trace_record_result.cc",
         "trace_replay/trace_replay.cc",
         "util/build_version.cc",
         "util/coding.cc",
index c6e807c1eaf0b29750dacf8e196c2e98dadcd88c..c09ea4c71ce41695a180d06626755a829ec9e33e 100644 (file)
@@ -6,6 +6,7 @@
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
+
 #include <atomic>
 #include <cstdlib>
 #include <functional>
@@ -17,6 +18,8 @@
 #include "port/port.h"
 #include "port/stack_trace.h"
 #include "rocksdb/persistent_cache.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/trace_record_result.h"
 #include "rocksdb/utilities/replayer.h"
 #include "rocksdb/wal_filter.h"
 #include "util/random.h"
@@ -4236,6 +4239,106 @@ TEST_F(DBTest2, TestNumPread) {
   ASSERT_EQ(0, env_->random_file_open_counter_.load());
 }
 
+class TraceExecutionResultHandler : public TraceRecordResult::Handler {
+ public:
+  TraceExecutionResultHandler() {}
+  ~TraceExecutionResultHandler() override {}
+
+  virtual Status Handle(const StatusOnlyTraceExecutionResult& result) override {
+    if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+      return Status::InvalidArgument("Invalid timestamps.");
+    }
+    result.GetStatus().PermitUncheckedError();
+    switch (result.GetTraceType()) {
+      case kTraceWrite: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        writes_++;
+        break;
+      }
+      case kTraceIteratorSeek:
+      case kTraceIteratorSeekForPrev: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        seeks_++;
+        break;
+      }
+      default:
+        return Status::Corruption("Type mismatch.");
+    }
+    return Status::OK();
+  }
+
+  virtual Status Handle(
+      const SingleValueTraceExecutionResult& result) override {
+    if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+      return Status::InvalidArgument("Invalid timestamps.");
+    }
+    result.GetStatus().PermitUncheckedError();
+    switch (result.GetTraceType()) {
+      case kTraceGet: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        gets_++;
+        break;
+      }
+      default:
+        return Status::Corruption("Type mismatch.");
+    }
+    return Status::OK();
+  }
+
+  virtual Status Handle(
+      const MultiValuesTraceExecutionResult& result) override {
+    if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+      return Status::InvalidArgument("Invalid timestamps.");
+    }
+    for (const Status& s : result.GetMultiStatus()) {
+      s.PermitUncheckedError();
+    }
+    switch (result.GetTraceType()) {
+      case kTraceMultiGet: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        multigets_++;
+        break;
+      }
+      default:
+        return Status::Corruption("Type mismatch.");
+    }
+    return Status::OK();
+  }
+
+  void Reset() {
+    total_latency_ = 0;
+    cnt_ = 0;
+    writes_ = 0;
+    gets_ = 0;
+    seeks_ = 0;
+    multigets_ = 0;
+  }
+
+  double GetAvgLatency() const {
+    return cnt_ == 0 ? 0.0 : 1.0 * total_latency_ / cnt_;
+  }
+
+  int GetNumWrites() const { return writes_; }
+
+  int GetNumGets() const { return gets_; }
+
+  int GetNumIterSeeks() const { return seeks_; }
+
+  int GetNumMultiGets() const { return multigets_; }
+
+ private:
+  std::atomic<uint64_t> total_latency_{0};
+  std::atomic<uint32_t> cnt_{0};
+  std::atomic<int> writes_{0};
+  std::atomic<int> gets_{0};
+  std::atomic<int> seeks_{0};
+  std::atomic<int> multigets_{0};
+};
+
 TEST_F(DBTest2, TraceAndReplay) {
   Options options = CurrentOptions();
   options.merge_operator = MergeOperators::CreatePutOperator();
@@ -4254,12 +4357,14 @@ TEST_F(DBTest2, TraceAndReplay) {
   ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
   ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
 
+  // 5 Writes
   ASSERT_OK(Put(0, "a", "1"));
   ASSERT_OK(Merge(0, "b", "2"));
   ASSERT_OK(Delete(0, "c"));
   ASSERT_OK(SingleDelete(0, "d"));
   ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
 
+  // 6th Write
   WriteBatch batch;
   ASSERT_OK(batch.Put("f", "11"));
   ASSERT_OK(batch.Merge("g", "12"));
@@ -4268,19 +4373,23 @@ TEST_F(DBTest2, TraceAndReplay) {
   ASSERT_OK(batch.DeleteRange("j", "k"));
   ASSERT_OK(db_->Write(wo, &batch));
 
+  // 2 Seek(ForPrev)s
   single_iter = db_->NewIterator(ro);
-  single_iter->Seek("f");
+  single_iter->Seek("f");  // Seek 1
   single_iter->SeekForPrev("g");
   ASSERT_OK(single_iter->status());
   delete single_iter;
 
+  // 2 Gets
   ASSERT_EQ("1", Get(0, "a"));
   ASSERT_EQ("12", Get(0, "g"));
 
+  // 7th and 8th Write, 3rd Get
   ASSERT_OK(Put(1, "foo", "bar"));
   ASSERT_OK(Put(1, "rocksdb", "rocks"));
   ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
 
+  // Total Write x 8, Get x 3, Seek x 2.
   ASSERT_OK(db_->EndTrace());
   // These should not get into the trace file as it is after EndTrace.
   ASSERT_OK(Put("hello", "world"));
@@ -4324,13 +4433,30 @@ TEST_F(DBTest2, TraceAndReplay) {
   std::unique_ptr<Replayer> replayer;
   ASSERT_OK(
       db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+  TraceExecutionResultHandler res_handler;
+  std::function<void(Status, std::unique_ptr<TraceRecordResult> &&)> res_cb =
+      [&res_handler](Status exec_s, std::unique_ptr<TraceRecordResult>&& res) {
+        ASSERT_TRUE(exec_s.ok() || exec_s.IsNotSupported());
+        if (res != nullptr) {
+          ASSERT_OK(res->Accept(&res_handler));
+          res.reset();
+        }
+      };
+
   // Unprepared replay should fail with Status::Incomplete()
-  ASSERT_TRUE(replayer->Replay().IsIncomplete());
+  ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
   ASSERT_OK(replayer->Prepare());
   // Ok to repeatedly Prepare().
   ASSERT_OK(replayer->Prepare());
   // Replay using 1 thread, 1x speed.
-  ASSERT_OK(replayer->Replay());
+  ASSERT_OK(replayer->Replay(ReplayOptions(1, 1.0), res_cb));
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 8);
+  ASSERT_EQ(res_handler.GetNumGets(), 3);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
 
   ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
   ASSERT_EQ("1", value);
@@ -4346,15 +4472,28 @@ TEST_F(DBTest2, TraceAndReplay) {
 
   // Re-replay should fail with Status::Incomplete() if Prepare() was not
   // called. Currently we don't distinguish between unprepared and trace end.
-  ASSERT_TRUE(replayer->Replay().IsIncomplete());
+  ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
 
   // Re-replay using 2 threads, 2x speed.
   ASSERT_OK(replayer->Prepare());
-  ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0)));
+  ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0), res_cb));
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 8);
+  ASSERT_EQ(res_handler.GetNumGets(), 3);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
 
   // Re-replay using 2 threads, 1/2 speed.
   ASSERT_OK(replayer->Prepare());
-  ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5)));
+  ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5), res_cb));
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 8);
+  ASSERT_EQ(res_handler.GetNumGets(), 3);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
+
   replayer.reset();
 
   for (auto handle : handles) {
@@ -4408,6 +4547,7 @@ TEST_F(DBTest2, TraceAndManualReplay) {
   ASSERT_OK(Put(1, "rocksdb", "rocks"));
   ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
 
+  // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2.
   ASSERT_OK(db_->EndTrace());
   // These should not get into the trace file as it is after EndTrace.
   ASSERT_OK(Put("hello", "world"));
@@ -4452,8 +4592,11 @@ TEST_F(DBTest2, TraceAndManualReplay) {
   ASSERT_OK(
       db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
 
+  TraceExecutionResultHandler res_handler;
+
   // Manual replay for 2 times. The 2nd checks if the replay can restart.
   std::unique_ptr<TraceRecord> record;
+  std::unique_ptr<TraceRecordResult> result;
   for (int i = 0; i < 2; i++) {
     // Next should fail if unprepared.
     ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
@@ -4467,13 +4610,23 @@ TEST_F(DBTest2, TraceAndManualReplay) {
         continue;
       }
       if (s.ok()) {
-        ASSERT_OK(replayer->Execute(std::move(record)));
+        ASSERT_OK(replayer->Execute(record, &result));
+        if (result != nullptr) {
+          ASSERT_OK(result->Accept(&res_handler));
+          result.reset();
+        }
       }
     }
     // Status::Incomplete() will be returned when manually reading the trace
     // end, or Prepare() was not called.
     ASSERT_TRUE(s.IsIncomplete());
     ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+    ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+    ASSERT_EQ(res_handler.GetNumWrites(), 8);
+    ASSERT_EQ(res_handler.GetNumGets(), 3);
+    ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+    ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+    res_handler.Reset();
   }
 
   ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
@@ -4495,25 +4648,44 @@ TEST_F(DBTest2, TraceAndManualReplay) {
   ASSERT_OK(batch.Put("trace-record-write1", "write1"));
   ASSERT_OK(batch.Put("trace-record-write2", "write2"));
   record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
-  ASSERT_OK(replayer->Execute(std::move(record)));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // Write x 1
   ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
   ASSERT_EQ("write1", value);
   ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
   ASSERT_EQ("write2", value);
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 1);
+  ASSERT_EQ(res_handler.GetNumGets(), 0);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
 
   // Get related
   // Get an existing key.
   record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
                                        "trace-record-write1", fake_ts++));
-  ASSERT_OK(replayer->Execute(std::move(record)));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // Get x 1
   // Get an non-existing key, should still return Status::OK().
   record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
                                        fake_ts++));
-  ASSERT_OK(replayer->Execute(std::move(record)));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // Get x 2
   // Get from an invalid (non-existing) cf_id.
   uint32_t invalid_cf_id = handles[1]->GetID() + 1;
   record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
-  ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+  ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+  ASSERT_TRUE(result == nullptr);
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 0);
+  ASSERT_EQ(res_handler.GetNumGets(), 2);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
 
   // Iteration related
   for (IteratorSeekQueryTraceRecord::SeekType seekType :
@@ -4522,48 +4694,82 @@ TEST_F(DBTest2, TraceAndManualReplay) {
     // Seek to an existing key.
     record.reset(new IteratorSeekQueryTraceRecord(
         seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
-    ASSERT_OK(replayer->Execute(std::move(record)));
+    ASSERT_OK(replayer->Execute(record, &result));
+    ASSERT_TRUE(result != nullptr);
+    ASSERT_OK(result->Accept(&res_handler));  // Seek x 1 in one iteration
     // Seek to an non-existing key, should still return Status::OK().
     record.reset(new IteratorSeekQueryTraceRecord(
         seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
-    ASSERT_OK(replayer->Execute(std::move(record)));
+    ASSERT_OK(replayer->Execute(record, &result));
+    ASSERT_TRUE(result != nullptr);
+    ASSERT_OK(result->Accept(&res_handler));  // Seek x 2 in one iteration
     // Seek from an invalid cf_id.
     record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
                                                   "whatever", fake_ts++));
-    ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+    ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+    ASSERT_TRUE(result == nullptr);
   }
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 0);
+  ASSERT_EQ(res_handler.GetNumGets(), 0);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 4);  // Seek x 2 in two iterations
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
 
   // MultiGet related
   // Get existing keys.
   record.reset(new MultiGetQueryTraceRecord(
       std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
       std::vector<std::string>({"a", "foo"}), fake_ts++));
-  ASSERT_OK(replayer->Execute(std::move(record)));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // MultiGet x 1
   // Get all non-existing keys, should still return Status::OK().
   record.reset(new MultiGetQueryTraceRecord(
       std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
       std::vector<std::string>({"no1", "no2"}), fake_ts++));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // MultiGet x 2
   // Get mixed of existing and non-existing keys, should still return
   // Status::OK().
   record.reset(new MultiGetQueryTraceRecord(
       std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
       std::vector<std::string>({"a", "no2"}), fake_ts++));
-  ASSERT_OK(replayer->Execute(std::move(record)));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  MultiValuesTraceExecutionResult* mvr =
+      dynamic_cast<MultiValuesTraceExecutionResult*>(result.get());
+  ASSERT_TRUE(mvr != nullptr);
+  ASSERT_OK(mvr->GetMultiStatus()[0]);
+  ASSERT_TRUE(mvr->GetMultiStatus()[1].IsNotFound());
+  ASSERT_EQ(mvr->GetValues()[0], "1");
+  ASSERT_EQ(mvr->GetValues()[1], "");
+  ASSERT_OK(result->Accept(&res_handler));  // MultiGet x 3
   // Get from an invalid (non-existing) cf_id.
   record.reset(new MultiGetQueryTraceRecord(
       std::vector<uint32_t>(
           {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
       std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
-  ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+  ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+  ASSERT_TRUE(result == nullptr);
   // Empty MultiGet
   record.reset(new MultiGetQueryTraceRecord(
       std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
-  ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+  ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+  ASSERT_TRUE(result == nullptr);
   // MultiGet size mismatch
   record.reset(new MultiGetQueryTraceRecord(
       std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
       std::vector<std::string>({"a"}), fake_ts++));
-  ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+  ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+  ASSERT_TRUE(result == nullptr);
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 0);
+  ASSERT_EQ(res_handler.GetNumGets(), 0);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 3);
+  res_handler.Reset();
 
   replayer.reset();
 
@@ -4634,7 +4840,7 @@ TEST_F(DBTest2, TraceWithLimit) {
   ASSERT_OK(
       db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
   ASSERT_OK(replayer->Prepare());
-  ASSERT_OK(replayer->Replay());
+  ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
   replayer.reset();
 
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
@@ -4709,7 +4915,7 @@ TEST_F(DBTest2, TraceWithSampling) {
   ASSERT_OK(
       db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
   ASSERT_OK(replayer->Prepare());
-  ASSERT_OK(replayer->Replay());
+  ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
   replayer.reset();
 
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
@@ -4813,7 +5019,7 @@ TEST_F(DBTest2, TraceWithFilter) {
   ASSERT_OK(
       db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
   ASSERT_OK(replayer->Prepare());
-  ASSERT_OK(replayer->Replay());
+  ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
   replayer.reset();
 
   // All the key-values should not present since we filter out the WRITE ops.
index f715a4396d007de5097d5295fa0396aa326a49c8..3f591d3d153db7acdc04e123ee6979e5ec4a94f1 100644 (file)
@@ -5,17 +5,18 @@
 
 #pragma once
 
+#include <memory>
 #include <string>
 #include <vector>
 
 #include "rocksdb/rocksdb_namespace.h"
 #include "rocksdb/slice.h"
+#include "rocksdb/status.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 class ColumnFamilyHandle;
 class DB;
-class Status;
 
 // Supported trace record types.
 enum TraceType : char {
@@ -41,40 +42,55 @@ enum TraceType : char {
   kTraceMax,
 };
 
-class WriteQueryTraceRecord;
 class GetQueryTraceRecord;
 class IteratorSeekQueryTraceRecord;
 class MultiGetQueryTraceRecord;
+class TraceRecordResult;
+class WriteQueryTraceRecord;
 
 // Base class for all types of trace records.
 class TraceRecord {
  public:
-  TraceRecord();
   explicit TraceRecord(uint64_t timestamp);
-  virtual ~TraceRecord();
 
+  virtual ~TraceRecord() = default;
+
+  // Type of the trace record.
   virtual TraceType GetTraceType() const = 0;
 
+  // Timestamp (in microseconds) of this trace.
   virtual uint64_t GetTimestamp() const;
 
   class Handler {
    public:
-    virtual ~Handler() {}
+    virtual ~Handler() = default;
+
+    // Handle WriteQueryTraceRecord
+    virtual Status Handle(const WriteQueryTraceRecord& record,
+                          std::unique_ptr<TraceRecordResult>* result) = 0;
+
+    // Handle GetQueryTraceRecord
+    virtual Status Handle(const GetQueryTraceRecord& record,
+                          std::unique_ptr<TraceRecordResult>* result) = 0;
 
-    virtual Status Handle(const WriteQueryTraceRecord& record) = 0;
-    virtual Status Handle(const GetQueryTraceRecord& record) = 0;
-    virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0;
-    virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0;
+    // Handle IteratorSeekQueryTraceRecord
+    virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
+                          std::unique_ptr<TraceRecordResult>* result) = 0;
+
+    // Handle MultiGetQueryTraceRecord
+    virtual Status Handle(const MultiGetQueryTraceRecord& record,
+                          std::unique_ptr<TraceRecordResult>* result) = 0;
   };
 
-  virtual Status Accept(Handler* handler) = 0;
+  // Accept the handler and report the corresponding result in `result`.
+  virtual Status Accept(Handler* handler,
+                        std::unique_ptr<TraceRecordResult>* result) = 0;
 
   // Create a handler for the exeution of TraceRecord.
   static Handler* NewExecutionHandler(
       DB* db, const std::vector<ColumnFamilyHandle*>& handles);
 
  private:
-  // Timestamp (in microseconds) of this trace.
   uint64_t timestamp_;
 };
 
@@ -82,8 +98,6 @@ class TraceRecord {
 class QueryTraceRecord : public TraceRecord {
  public:
   explicit QueryTraceRecord(uint64_t timestamp);
-
-  virtual ~QueryTraceRecord() override;
 };
 
 // Trace record for DB::Write() operation.
@@ -97,9 +111,11 @@ class WriteQueryTraceRecord : public QueryTraceRecord {
 
   TraceType GetTraceType() const override { return kTraceWrite; }
 
+  // rep string for the WriteBatch.
   virtual Slice GetWriteBatchRep() const;
 
-  virtual Status Accept(Handler* handler) override;
+  Status Accept(Handler* handler,
+                std::unique_ptr<TraceRecordResult>* result) override;
 
  private:
   PinnableSlice rep_;
@@ -118,16 +134,17 @@ class GetQueryTraceRecord : public QueryTraceRecord {
 
   TraceType GetTraceType() const override { return kTraceGet; }
 
+  // Column family ID.
   virtual uint32_t GetColumnFamilyID() const;
 
+  // Key to get.
   virtual Slice GetKey() const;
 
-  virtual Status Accept(Handler* handler) override;
+  Status Accept(Handler* handler,
+                std::unique_ptr<TraceRecordResult>* result) override;
 
  private:
-  // Column family ID.
   uint32_t cf_id_;
-  // Key to get.
   PinnableSlice key_;
 };
 
@@ -135,8 +152,6 @@ class GetQueryTraceRecord : public QueryTraceRecord {
 class IteratorQueryTraceRecord : public QueryTraceRecord {
  public:
   explicit IteratorQueryTraceRecord(uint64_t timestamp);
-
-  virtual ~IteratorQueryTraceRecord() override;
 };
 
 // Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation.
@@ -156,21 +171,24 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord {
 
   virtual ~IteratorSeekQueryTraceRecord() override;
 
+  // Trace type matches the seek type.
   TraceType GetTraceType() const override;
 
+  // Type of seek, Seek or SeekForPrev.
   virtual SeekType GetSeekType() const;
 
+  // Column family ID.
   virtual uint32_t GetColumnFamilyID() const;
 
+  // Key to seek to.
   virtual Slice GetKey() const;
 
-  virtual Status Accept(Handler* handler) override;
+  Status Accept(Handler* handler,
+                std::unique_ptr<TraceRecordResult>* result) override;
 
  private:
   SeekType type_;
-  // Column family ID.
   uint32_t cf_id_;
-  // Key to seek to.
   PinnableSlice key_;
 };
 
@@ -189,16 +207,17 @@ class MultiGetQueryTraceRecord : public QueryTraceRecord {
 
   TraceType GetTraceType() const override { return kTraceMultiGet; }
 
+  // Column familiy IDs.
   virtual std::vector<uint32_t> GetColumnFamilyIDs() const;
 
+  // Keys to get.
   virtual std::vector<Slice> GetKeys() const;
 
-  virtual Status Accept(Handler* handler) override;
+  Status Accept(Handler* handler,
+                std::unique_ptr<TraceRecordResult>* result) override;
 
  private:
-  // Column familiy IDs.
   std::vector<uint32_t> cf_ids_;
-  // Keys to get.
   std::vector<PinnableSlice> keys_;
 };
 
diff --git a/include/rocksdb/trace_record_result.h b/include/rocksdb/trace_record_result.h
new file mode 100644 (file)
index 0000000..7924c6d
--- /dev/null
@@ -0,0 +1,178 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class MultiValuesTraceExecutionResult;
+class SingleValueTraceExecutionResult;
+class StatusOnlyTraceExecutionResult;
+
+// Base class for the results of all types of trace records.
+// Theses classes can be used to report the execution result of
+// TraceRecord::Handler::Handle() or TraceRecord::Accept().
+class TraceRecordResult {
+ public:
+  explicit TraceRecordResult(TraceType trace_type);
+
+  virtual ~TraceRecordResult() = default;
+
+  // Trace type of the corresponding TraceRecord.
+  virtual TraceType GetTraceType() const;
+
+  class Handler {
+   public:
+    virtual ~Handler() = default;
+
+    // Handle StatusOnlyTraceExecutionResult
+    virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0;
+
+    // Handle SingleValueTraceExecutionResult
+    virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0;
+
+    // Handle MultiValuesTraceExecutionResult
+    virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0;
+  };
+
+  /*
+   * Example handler to just print the trace record execution results.
+   *
+   * class ResultPrintHandler : public TraceRecordResult::Handler {
+   *  public:
+   *   ResultPrintHandler();
+   *   ~ResultPrintHandler() override {}
+   *
+   *   Status Handle(const StatusOnlyTraceExecutionResult& result) override {
+   *     std::cout << "Status: " << result.GetStatus().ToString() << std::endl;
+   *   }
+   *
+   *   Status Handle(const SingleValueTraceExecutionResult& result) override {
+   *     std::cout << "Status: " << result.GetStatus().ToString()
+   *               << ", value: " << result.GetValue() << std::endl;
+   *   }
+   *
+   *   Status Handle(const MultiValuesTraceExecutionResult& result) override {
+   *     size_t size = result.GetMultiStatus().size();
+   *     for (size_t i = 0; i < size; i++) {
+   *       std::cout << "Status: " << result.GetMultiStatus()[i].ToString()
+   *                 << ", value: " << result.GetValues()[i] << std::endl;
+   *     }
+   *   }
+   * };
+   * */
+
+  // Accept the handler.
+  virtual Status Accept(Handler* handler) = 0;
+
+ private:
+  TraceType trace_type_;
+};
+
+// Base class for the results from the trace record execution handler (created
+// by TraceRecord::NewExecutionHandler()).
+//
+// The actual execution status or returned values may be hidden from
+// TraceRecord::Handler::Handle and TraceRecord::Accept. For example, a
+// GetQueryTraceRecord's execution calls DB::Get() internally. DB::Get() may
+// return Status::NotFound() but TraceRecord::Handler::Handle() or
+// TraceRecord::Accept() will still return Status::OK(). The actual status from
+// DB::Get() and the returned value string may be saved in a
+// SingleValueTraceExecutionResult.
+class TraceExecutionResult : public TraceRecordResult {
+ public:
+  TraceExecutionResult(uint64_t start_timestamp, uint64_t end_timestamp,
+                       TraceType trace_type);
+
+  // Execution start/end timestamps and request latency in microseconds.
+  virtual uint64_t GetStartTimestamp() const;
+  virtual uint64_t GetEndTimestamp() const;
+  inline uint64_t GetLatency() const {
+    return GetEndTimestamp() - GetStartTimestamp();
+  }
+
+ private:
+  uint64_t ts_start_;
+  uint64_t ts_end_;
+};
+
+// Result for operations that only return a single Status.
+// Example operations: DB::Write(), Iterator::Seek() and
+// Iterator::SeekForPrev().
+class StatusOnlyTraceExecutionResult : public TraceExecutionResult {
+ public:
+  StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp,
+                                 uint64_t end_timestamp, TraceType trace_type);
+
+  virtual ~StatusOnlyTraceExecutionResult() override = default;
+
+  // Return value of DB::Write(), etc.
+  virtual const Status& GetStatus() const;
+
+  virtual Status Accept(Handler* handler) override;
+
+ private:
+  Status status_;
+};
+
+// Result for operations that return a Status and a value.
+// Example operation: DB::Get()
+class SingleValueTraceExecutionResult : public TraceExecutionResult {
+ public:
+  SingleValueTraceExecutionResult(Status status, const std::string& value,
+                                  uint64_t start_timestamp,
+                                  uint64_t end_timestamp, TraceType trace_type);
+
+  SingleValueTraceExecutionResult(Status status, std::string&& value,
+                                  uint64_t start_timestamp,
+                                  uint64_t end_timestamp, TraceType trace_type);
+
+  virtual ~SingleValueTraceExecutionResult() override;
+
+  // Return status of DB::Get(), etc.
+  virtual const Status& GetStatus() const;
+
+  // Value for the searched key.
+  virtual const std::string& GetValue() const;
+
+  virtual Status Accept(Handler* handler) override;
+
+ private:
+  Status status_;
+  std::string value_;
+};
+
+// Result for operations that return multiple Status(es) and values.
+// Example operation: DB::MultiGet()
+class MultiValuesTraceExecutionResult : public TraceExecutionResult {
+ public:
+  MultiValuesTraceExecutionResult(std::vector<Status> multi_status,
+                                  std::vector<std::string> values,
+                                  uint64_t start_timestamp,
+                                  uint64_t end_timestamp, TraceType trace_type);
+
+  virtual ~MultiValuesTraceExecutionResult() override;
+
+  // Returned Status(es) of DB::MultiGet(), etc.
+  virtual const std::vector<Status>& GetMultiStatus() const;
+
+  // Returned values for the searched keys.
+  virtual const std::vector<std::string>& GetValues() const;
+
+  virtual Status Accept(Handler* handler) override;
+
+ private:
+  std::vector<Status> multi_status_;
+  std::vector<std::string> values_;
+};
+
+}  // namespace ROCKSDB_NAMESPACE
index 976fadb689d260971159c7bde445477d7f467938..4fdd8d73a7aae9f1eadfa3235657bacb3ab8257e 100644 (file)
@@ -6,14 +6,17 @@
 #pragma once
 #ifndef ROCKSDB_LITE
 
+#include <functional>
 #include <memory>
 
 #include "rocksdb/rocksdb_namespace.h"
 #include "rocksdb/status.h"
-#include "rocksdb/trace_record.h"
 
 namespace ROCKSDB_NAMESPACE {
 
+class TraceRecord;
+class TraceRecordResult;
+
 struct ReplayOptions {
   // Number of threads used for replaying. If 0 or 1, replay using
   // single thread.
@@ -27,6 +30,7 @@ struct ReplayOptions {
   double fast_forward;
 
   ReplayOptions() : num_threads(1), fast_forward(1.0) {}
+
   ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio)
       : num_threads(num_of_threads), fast_forward(fast_forward_ratio) {}
 };
@@ -36,7 +40,7 @@ struct ReplayOptions {
 // instantiated via db_bench today, on using "replay" benchmark.
 class Replayer {
  public:
-  virtual ~Replayer() {}
+  virtual ~Replayer() = default;
 
   // Make some preparation before replaying the trace. This will also reset the
   // replayer in order to restart replaying.
@@ -61,13 +65,22 @@ class Replayer {
   // trace;
   // Status::NotSupported() if the operation is not supported;
   // Otherwise, return the corresponding error status.
-  virtual Status Execute(const std::unique_ptr<TraceRecord>& record) = 0;
-  virtual Status Execute(std::unique_ptr<TraceRecord>&& record) = 0;
+  //
+  // The actual operation execution status and result(s) will be saved in
+  // result. For example, a GetQueryTraceRecord will have its DB::Get() status
+  // and the returned value saved in a SingleValueTraceExecutionResult.
+  virtual Status Execute(const std::unique_ptr<TraceRecord>& record,
+                         std::unique_ptr<TraceRecordResult>* result) = 0;
 
   // Replay all the traces from the provided trace stream, taking the delay
   // between the traces into consideration.
-  virtual Status Replay(const ReplayOptions& options) = 0;
-  virtual Status Replay() { return Replay(ReplayOptions()); }
+  //
+  // result_callback reports the status of executing a trace record, and the
+  // actual operation execution result (See the description for Execute()).
+  virtual Status Replay(
+      const ReplayOptions& options,
+      const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+          result_callback) = 0;
 };
 
 }  // namespace ROCKSDB_NAMESPACE
diff --git a/src.mk b/src.mk
index b2c83f048e3ca3f59c14c6de5535e8b2324c756b..28d8380ad5bebd8d9936e23e7e64fee039f8ea17 100644 (file)
--- a/src.mk
+++ b/src.mk
@@ -199,6 +199,7 @@ LIB_SOURCES =                                                   \
   test_util/transaction_test_util.cc                            \
   tools/dump/db_dump_tool.cc                                    \
   trace_replay/trace_record_handler.cc                          \
+  trace_replay/trace_record_result.cc                           \
   trace_replay/trace_record.cc                                  \
   trace_replay/trace_replay.cc                                  \
   trace_replay/block_cache_tracer.cc                            \
index 89d75053624606349091ac73cb5f59e89e358f3b..bf6e0c1c0668ee8c3f65f7302ddc2a30ae74ae04 100644 (file)
@@ -8027,7 +8027,8 @@ class Benchmark {
     }
     s = replayer->Replay(
         ReplayOptions(static_cast<uint32_t>(FLAGS_trace_replay_threads),
-                      FLAGS_trace_replay_fast_forward));
+                      FLAGS_trace_replay_fast_forward),
+        nullptr);
     replayer.reset();
     if (s.ok()) {
       fprintf(stdout, "Replay completed from trace_file: %s\n",
index 75afcf37eddca4cc78f43bd3744bf933497b2312..e0ce0209040c1ebeb752f6c21bcbb35d117aeed7 100644 (file)
@@ -11,6 +11,7 @@
 #include "rocksdb/iterator.h"
 #include "rocksdb/options.h"
 #include "rocksdb/status.h"
+#include "rocksdb/trace_record_result.h"
 #include "trace_replay/trace_record_handler.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -18,8 +19,6 @@ namespace ROCKSDB_NAMESPACE {
 // TraceRecord
 TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {}
 
-TraceRecord::~TraceRecord() {}
-
 uint64_t TraceRecord::GetTimestamp() const { return timestamp_; }
 
 TraceRecord::Handler* TraceRecord::NewExecutionHandler(
@@ -31,8 +30,6 @@ TraceRecord::Handler* TraceRecord::NewExecutionHandler(
 QueryTraceRecord::QueryTraceRecord(uint64_t timestamp)
     : TraceRecord(timestamp) {}
 
-QueryTraceRecord::~QueryTraceRecord() {}
-
 // WriteQueryTraceRecord
 WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep,
                                              uint64_t timestamp)
@@ -44,13 +41,14 @@ WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep,
   rep_.PinSelf(write_batch_rep);
 }
 
-WriteQueryTraceRecord::~WriteQueryTraceRecord() {}
+WriteQueryTraceRecord::~WriteQueryTraceRecord() { rep_.clear(); }
 
 Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); }
 
-Status WriteQueryTraceRecord::Accept(Handler* handler) {
+Status WriteQueryTraceRecord::Accept(
+    Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
   assert(handler != nullptr);
-  return handler->Handle(*this);
+  return handler->Handle(*this, result);
 }
 
 // GetQueryTraceRecord
@@ -68,23 +66,22 @@ GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
   key_.PinSelf(key);
 }
 
-GetQueryTraceRecord::~GetQueryTraceRecord() {}
+GetQueryTraceRecord::~GetQueryTraceRecord() { key_.clear(); }
 
 uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; }
 
 Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); }
 
-Status GetQueryTraceRecord::Accept(Handler* handler) {
+Status GetQueryTraceRecord::Accept(Handler* handler,
+                                   std::unique_ptr<TraceRecordResult>* result) {
   assert(handler != nullptr);
-  return handler->Handle(*this);
+  return handler->Handle(*this, result);
 }
 
 // IteratorQueryTraceRecord
 IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
     : QueryTraceRecord(timestamp) {}
 
-IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
-
 // IteratorSeekQueryTraceRecord
 IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
     SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
@@ -103,7 +100,7 @@ IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
   key_.PinSelf(key);
 }
 
-IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {}
+IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); }
 
 TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {
   return static_cast<TraceType>(type_);
@@ -120,9 +117,10 @@ uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const {
 
 Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); }
 
-Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) {
+Status IteratorSeekQueryTraceRecord::Accept(
+    Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
   assert(handler != nullptr);
-  return handler->Handle(*this);
+  return handler->Handle(*this, result);
 }
 
 // MultiGetQueryTraceRecord
@@ -145,7 +143,10 @@ MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
   }
 }
 
-MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {}
+MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {
+  cf_ids_.clear();
+  keys_.clear();
+}
 
 std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const {
   return cf_ids_;
@@ -155,9 +156,10 @@ std::vector<Slice> MultiGetQueryTraceRecord::GetKeys() const {
   return std::vector<Slice>(keys_.begin(), keys_.end());
 }
 
-Status MultiGetQueryTraceRecord::Accept(Handler* handler) {
+Status MultiGetQueryTraceRecord::Accept(
+    Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
   assert(handler != nullptr);
-  return handler->Handle(*this);
+  return handler->Handle(*this, result);
 }
 
 }  // namespace ROCKSDB_NAMESPACE
index 3651d0fe2003ef4e1c4b369fe0372c7ea419acda..6343d2ed351d97dfc25f8d9692a8cb4a47a18d57 100644 (file)
@@ -6,6 +6,7 @@
 #include "trace_replay/trace_record_handler.h"
 
 #include "rocksdb/iterator.h"
+#include "rocksdb/trace_record_result.h"
 #include "rocksdb/write_batch.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -16,7 +17,8 @@ TraceExecutionHandler::TraceExecutionHandler(
     : TraceRecord::Handler(),
       db_(db),
       write_opts_(WriteOptions()),
-      read_opts_(ReadOptions()) {
+      read_opts_(ReadOptions()),
+      clock_(SystemClock::Default()) {
   assert(db != nullptr);
   assert(!handles.empty());
   cf_map_.reserve(handles.size());
@@ -28,26 +30,64 @@ TraceExecutionHandler::TraceExecutionHandler(
 
 TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
 
-Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) {
+Status TraceExecutionHandler::Handle(
+    const WriteQueryTraceRecord& record,
+    std::unique_ptr<TraceRecordResult>* result) {
+  if (result != nullptr) {
+    result->reset(nullptr);
+  }
+  uint64_t start = clock_->NowMicros();
+
   WriteBatch batch(record.GetWriteBatchRep().ToString());
-  return db_->Write(write_opts_, &batch);
+  Status s = db_->Write(write_opts_, &batch);
+
+  uint64_t end = clock_->NowMicros();
+
+  if (s.ok() && result != nullptr) {
+    result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
+                                                     record.GetTraceType()));
+  }
+
+  return s;
 }
 
-Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) {
+Status TraceExecutionHandler::Handle(
+    const GetQueryTraceRecord& record,
+    std::unique_ptr<TraceRecordResult>* result) {
+  if (result != nullptr) {
+    result->reset(nullptr);
+  }
   auto it = cf_map_.find(record.GetColumnFamilyID());
   if (it == cf_map_.end()) {
     return Status::Corruption("Invalid Column Family ID.");
   }
 
+  uint64_t start = clock_->NowMicros();
+
   std::string value;
   Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
 
-  // Treat not found as ok and return other errors.
-  return s.IsNotFound() ? Status::OK() : s;
+  uint64_t end = clock_->NowMicros();
+
+  // Treat not found as ok, return other errors.
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (result != nullptr) {
+    // Report the actual opetation status in TraceExecutionResult
+    result->reset(new SingleValueTraceExecutionResult(
+        std::move(s), std::move(value), start, end, record.GetTraceType()));
+  }
+  return Status::OK();
 }
 
 Status TraceExecutionHandler::Handle(
-    const IteratorSeekQueryTraceRecord& record) {
+    const IteratorSeekQueryTraceRecord& record,
+    std::unique_ptr<TraceRecordResult>* result) {
+  if (result != nullptr) {
+    result->reset(nullptr);
+  }
   auto it = cf_map_.find(record.GetColumnFamilyID());
   if (it == cf_map_.end()) {
     return Status::Corruption("Invalid Column Family ID.");
@@ -55,6 +95,8 @@ Status TraceExecutionHandler::Handle(
 
   Iterator* single_iter = db_->NewIterator(read_opts_, it->second);
 
+  uint64_t start = clock_->NowMicros();
+
   switch (record.GetSeekType()) {
     case IteratorSeekQueryTraceRecord::kSeekForPrev: {
       single_iter->SeekForPrev(record.GetKey());
@@ -65,12 +107,26 @@ Status TraceExecutionHandler::Handle(
       break;
     }
   }
+
+  uint64_t end = clock_->NowMicros();
+
   Status s = single_iter->status();
   delete single_iter;
+
+  if (s.ok() && result != nullptr) {
+    result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
+                                                     record.GetTraceType()));
+  }
+
   return s;
 }
 
-Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
+Status TraceExecutionHandler::Handle(
+    const MultiGetQueryTraceRecord& record,
+    std::unique_ptr<TraceRecordResult>* result) {
+  if (result != nullptr) {
+    result->reset(nullptr);
+  }
   std::vector<ColumnFamilyHandle*> handles;
   handles.reserve(record.GetColumnFamilyIDs().size());
   for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
@@ -90,15 +146,26 @@ Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
     return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
   }
 
+  uint64_t start = clock_->NowMicros();
+
   std::vector<std::string> values;
   std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
 
+  uint64_t end = clock_->NowMicros();
+
   // Treat not found as ok, return other errors.
-  for (Status s : ss) {
+  for (const Status& s : ss) {
     if (!s.ok() && !s.IsNotFound()) {
       return s;
     }
   }
+
+  if (result != nullptr) {
+    // Report the actual opetation status in TraceExecutionResult
+    result->reset(new MultiValuesTraceExecutionResult(
+        std::move(ss), std::move(values), start, end, record.GetTraceType()));
+  }
+
   return Status::OK();
 }
 
index fbc5a839f3788275fafccea6f55106bf04003b11..0d9f1d629f656b84ac38ceb999457c70d216562c 100644 (file)
@@ -5,12 +5,14 @@
 
 #pragma once
 
+#include <memory>
 #include <unordered_map>
 #include <vector>
 
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/status.h"
+#include "rocksdb/system_clock.h"
 #include "rocksdb/trace_record.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -22,16 +24,21 @@ class TraceExecutionHandler : public TraceRecord::Handler {
                         const std::vector<ColumnFamilyHandle*>& handles);
   virtual ~TraceExecutionHandler() override;
 
-  virtual Status Handle(const WriteQueryTraceRecord& record) override;
-  virtual Status Handle(const GetQueryTraceRecord& record) override;
-  virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override;
-  virtual Status Handle(const MultiGetQueryTraceRecord& record) override;
+  virtual Status Handle(const WriteQueryTraceRecord& record,
+                        std::unique_ptr<TraceRecordResult>* result) override;
+  virtual Status Handle(const GetQueryTraceRecord& record,
+                        std::unique_ptr<TraceRecordResult>* result) override;
+  virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
+                        std::unique_ptr<TraceRecordResult>* result) override;
+  virtual Status Handle(const MultiGetQueryTraceRecord& record,
+                        std::unique_ptr<TraceRecordResult>* result) override;
 
  private:
   DB* db_;
   std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
   WriteOptions write_opts_;
   ReadOptions read_opts_;
+  std::shared_ptr<SystemClock> clock_;
 };
 
 // To do: Handler for trace_analyzer.
diff --git a/trace_replay/trace_record_result.cc b/trace_replay/trace_record_result.cc
new file mode 100644 (file)
index 0000000..b22b57e
--- /dev/null
@@ -0,0 +1,106 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/trace_record_result.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceRecordResult
+TraceRecordResult::TraceRecordResult(TraceType trace_type)
+    : trace_type_(trace_type) {}
+
+TraceType TraceRecordResult::GetTraceType() const { return trace_type_; }
+
+// TraceExecutionResult
+TraceExecutionResult::TraceExecutionResult(uint64_t start_timestamp,
+                                           uint64_t end_timestamp,
+                                           TraceType trace_type)
+    : TraceRecordResult(trace_type),
+      ts_start_(start_timestamp),
+      ts_end_(end_timestamp) {
+  assert(ts_start_ <= ts_end_);
+}
+
+uint64_t TraceExecutionResult::GetStartTimestamp() const { return ts_start_; }
+
+uint64_t TraceExecutionResult::GetEndTimestamp() const { return ts_end_; }
+
+// StatusOnlyTraceExecutionResult
+StatusOnlyTraceExecutionResult::StatusOnlyTraceExecutionResult(
+    Status status, uint64_t start_timestamp, uint64_t end_timestamp,
+    TraceType trace_type)
+    : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+      status_(std::move(status)) {}
+
+const Status& StatusOnlyTraceExecutionResult::GetStatus() const {
+  return status_;
+}
+
+Status StatusOnlyTraceExecutionResult::Accept(Handler* handler) {
+  assert(handler != nullptr);
+  return handler->Handle(*this);
+}
+
+// SingleValueTraceExecutionResult
+SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
+    Status status, const std::string& value, uint64_t start_timestamp,
+    uint64_t end_timestamp, TraceType trace_type)
+    : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+      status_(std::move(status)),
+      value_(value) {}
+
+SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
+    Status status, std::string&& value, uint64_t start_timestamp,
+    uint64_t end_timestamp, TraceType trace_type)
+    : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+      status_(std::move(status)),
+      value_(std::move(value)) {}
+
+SingleValueTraceExecutionResult::~SingleValueTraceExecutionResult() {
+  value_.clear();
+}
+
+const Status& SingleValueTraceExecutionResult::GetStatus() const {
+  return status_;
+}
+
+const std::string& SingleValueTraceExecutionResult::GetValue() const {
+  return value_;
+}
+
+Status SingleValueTraceExecutionResult::Accept(Handler* handler) {
+  assert(handler != nullptr);
+  return handler->Handle(*this);
+}
+
+// MultiValuesTraceExecutionResult
+MultiValuesTraceExecutionResult::MultiValuesTraceExecutionResult(
+    std::vector<Status> multi_status, std::vector<std::string> values,
+    uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
+    : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+      multi_status_(std::move(multi_status)),
+      values_(std::move(values)) {}
+
+MultiValuesTraceExecutionResult::~MultiValuesTraceExecutionResult() {
+  multi_status_.clear();
+  values_.clear();
+}
+
+const std::vector<Status>& MultiValuesTraceExecutionResult::GetMultiStatus()
+    const {
+  return multi_status_;
+}
+
+const std::vector<std::string>& MultiValuesTraceExecutionResult::GetValues()
+    const {
+  return values_;
+}
+
+Status MultiValuesTraceExecutionResult::Accept(Handler* handler) {
+  assert(handler != nullptr);
+  return handler->Handle(*this);
+}
+
+}  // namespace ROCKSDB_NAMESPACE
index b3063c1431074a5e741b5af924c829f9fa708849..af2e76500decfc368108e049dab2c3e73564f4f5 100644 (file)
@@ -131,6 +131,10 @@ Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version,
   assert(trace != nullptr);
   assert(trace->type == kTraceWrite);
 
+  if (record != nullptr) {
+    record->reset(nullptr);
+  }
+
   PinnableSlice rep;
   if (trace_file_version < 2) {
     rep.PinSelf(trace->payload);
@@ -168,6 +172,10 @@ Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version,
   assert(trace != nullptr);
   assert(trace->type == kTraceGet);
 
+  if (record != nullptr) {
+    record->reset(nullptr);
+  }
+
   uint32_t cf_id = 0;
   Slice get_key;
 
@@ -211,6 +219,10 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
   assert(trace->type == kTraceIteratorSeek ||
          trace->type == kTraceIteratorSeekForPrev);
 
+  if (record != nullptr) {
+    record->reset(nullptr);
+  }
+
   uint32_t cf_id = 0;
   Slice iter_key;
 
@@ -265,6 +277,11 @@ Status TracerHelper::DecodeMultiGetRecord(
     std::unique_ptr<TraceRecord>* record) {
   assert(trace != nullptr);
   assert(trace->type == kTraceMultiGet);
+
+  if (record != nullptr) {
+    record->reset(nullptr);
+  }
+
   if (trace_file_version < 2) {
     return Status::Corruption("MultiGet is not supported.");
   }
index 2462db468f8a792c659cee0c59a7bc0c5fa0fbc6..c98155d5345f719f43805fff2530db7ad4115a11 100644 (file)
 #include <cmath>
 #include <thread>
 
-#include "rocksdb/db.h"
-#include "rocksdb/env.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
-#include "rocksdb/status.h"
 #include "rocksdb/system_clock.h"
-#include "rocksdb/trace_reader_writer.h"
 #include "util/threadpool_imp.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -77,17 +73,15 @@ Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
   return DecodeTraceRecord(&trace, trace_file_version_, record);
 }
 
-Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record) {
-  return record->Accept(exec_handler_.get());
+Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
+                             std::unique_ptr<TraceRecordResult>* result) {
+  return record->Accept(exec_handler_.get(), result);
 }
 
-Status ReplayerImpl::Execute(std::unique_ptr<TraceRecord>&& record) {
-  Status s = record->Accept(exec_handler_.get());
-  record.reset();
-  return s;
-}
-
-Status ReplayerImpl::Replay(const ReplayOptions& options) {
+Status ReplayerImpl::Replay(
+    const ReplayOptions& options,
+    const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+        result_callback) {
   if (options.fast_forward <= 0.0) {
     return Status::InvalidArgument("Wrong fast forward speed!");
   }
@@ -124,19 +118,34 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) {
       // In single-threaded replay, decode first then sleep.
       std::unique_ptr<TraceRecord> record;
       s = DecodeTraceRecord(&trace, trace_file_version_, &record);
-      // Skip unsupported traces, stop for other errors.
-      if (s.IsNotSupported()) {
-        continue;
-      } else if (!s.ok()) {
+      if (!s.ok() && !s.IsNotSupported()) {
         break;
       }
 
-      std::this_thread::sleep_until(
+      std::chrono::system_clock::time_point sleep_to =
           replay_epoch +
           std::chrono::microseconds(static_cast<uint64_t>(std::llround(
-              1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+              1.0 * (trace.ts - header_ts_) / options.fast_forward)));
+      if (sleep_to > std::chrono::system_clock::now()) {
+        std::this_thread::sleep_until(sleep_to);
+      }
 
-      s = Execute(std::move(record));
+      // Skip unsupported traces, stop for other errors.
+      if (s.IsNotSupported()) {
+        if (result_callback != nullptr) {
+          result_callback(s, nullptr);
+        }
+        s = Status::OK();
+        continue;
+      }
+
+      if (result_callback == nullptr) {
+        s = Execute(record, nullptr);
+      } else {
+        std::unique_ptr<TraceRecordResult> res;
+        s = Execute(record, &res);
+        result_callback(s, std::move(res));
+      }
     }
   } else {
     // Multi-threaded replay.
@@ -181,10 +190,13 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) {
 
       // In multi-threaded replay, sleep first thatn start decoding and
       // execution in a thread.
-      std::this_thread::sleep_until(
+      std::chrono::system_clock::time_point sleep_to =
           replay_epoch +
           std::chrono::microseconds(static_cast<uint64_t>(std::llround(
-              1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+              1.0 * (trace.ts - header_ts_) / options.fast_forward)));
+      if (sleep_to > std::chrono::system_clock::now()) {
+        std::this_thread::sleep_until(sleep_to);
+      }
 
       if (trace_type == kTraceWrite || trace_type == kTraceGet ||
           trace_type == kTraceIteratorSeek ||
@@ -195,10 +207,16 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) {
         ra->handler = exec_handler_.get();
         ra->trace_file_version = trace_file_version_;
         ra->error_cb = error_cb;
+        ra->result_cb = result_callback;
         thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
                              nullptr, nullptr);
+      } else {
+        // Skip unsupported traces.
+        if (result_callback != nullptr) {
+          result_callback(Status::NotSupported("Unsupported trace type."),
+                          nullptr);
+        }
       }
-      // Skip unsupported traces.
     }
 
     thread_pool.WaitForJobsAndJoinAllThreads();
@@ -293,13 +311,26 @@ void ReplayerImpl::BackgroundWork(void* arg) {
   std::unique_ptr<TraceRecord> record;
   Status s =
       DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
-  if (s.ok()) {
-    s = record->Accept(ra->handler);
-    record.reset();
+  if (!s.ok()) {
+    // Stop the replay
+    if (ra->error_cb != nullptr) {
+      ra->error_cb(s, ra->trace_entry.ts);
+    }
+    // Report the result
+    if (ra->result_cb != nullptr) {
+      ra->result_cb(s, nullptr);
+    }
+    return;
   }
-  if (!s.ok() && ra->error_cb) {
-    ra->error_cb(s, ra->trace_entry.ts);
+
+  if (ra->result_cb == nullptr) {
+    s = record->Accept(ra->handler, nullptr);
+  } else {
+    std::unique_ptr<TraceRecordResult> res;
+    s = record->Accept(ra->handler, &res);
+    ra->result_cb(s, std::move(res));
   }
+  record.reset();
 }
 
 }  // namespace ROCKSDB_NAMESPACE
index 6cf4455e958c85711476a30cdbdeb099af477e1b..9cf18296069ccf5313461fb5d4334a9ec58b4970 100644 (file)
 #include <mutex>
 #include <unordered_map>
 
-#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
 #include "rocksdb/trace_record.h"
+#include "rocksdb/trace_record_result.h"
 #include "rocksdb/utilities/replayer.h"
 #include "trace_replay/trace_replay.h"
 
 namespace ROCKSDB_NAMESPACE {
 
-class ColumnFamilyHandle;
-class DB;
-class Env;
-class TraceReader;
-class TraceRecord;
-class Status;
-
-struct ReplayOptions;
-
 class ReplayerImpl : public Replayer {
  public:
   ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
@@ -41,11 +36,14 @@ class ReplayerImpl : public Replayer {
   Status Next(std::unique_ptr<TraceRecord>* record) override;
 
   using Replayer::Execute;
-  Status Execute(const std::unique_ptr<TraceRecord>& record) override;
-  Status Execute(std::unique_ptr<TraceRecord>&& record) override;
+  Status Execute(const std::unique_ptr<TraceRecord>& record,
+                 std::unique_ptr<TraceRecordResult>* result) override;
 
   using Replayer::Replay;
-  Status Replay(const ReplayOptions& options) override;
+  Status Replay(
+      const ReplayOptions& options,
+      const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+          result_callback) override;
 
   using Replayer::GetHeaderTimestamp;
   uint64_t GetHeaderTimestamp() const override;
@@ -84,6 +82,9 @@ struct ReplayerWorkerArg {
   // Callback function to report the error status and the timestamp of the
   // TraceRecord.
   std::function<void(Status, uint64_t)> error_cb;
+  // Callback function to report the trace execution status and operation
+  // execution status/result(s).
+  std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb;
 };
 
 }  // namespace ROCKSDB_NAMESPACE