]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Make TraceRecord and Replayer public (#8611)
authorMerlin Mao <qzmao@fb.com>
Thu, 12 Aug 2021 02:31:44 +0000 (19:31 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Thu, 12 Aug 2021 02:32:46 +0000 (19:32 -0700)
Summary:
New public interfaces:
`TraceRecord` and `TraceRecord::Handler`, available in "rocksdb/trace_record.h".
`Replayer`, available in `rocksdb/utilities/replayer.h`.

User can use `DB::NewDefaultReplayer()` to create a Replayer to auto/manual replay a trace file.

Unit tests:
- `./db_test2 --gtest_filter="DBTest2.TraceAndReplay"`: Updated with the internal API changes.
- `./db_test2 --gtest_filter="DBTest2.TraceAndManualReplay"`: New for manual replay.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8611

Reviewed By: ajkr

Differential Revision: D30266329

Pulled By: autopear

fbshipit-source-id: 1ecb3cbbedae0f6a67c18f0cc82e002b4d81b6f8

36 files changed:
CMakeLists.txt
HISTORY.md
TARGETS
db/db_impl/db_impl.cc
db/db_impl/db_impl.h
db/db_test2.cc
env/file_system_tracer.cc
include/rocksdb/db.h
include/rocksdb/trace_reader_writer.h
include/rocksdb/trace_record.h [new file with mode: 0644]
include/rocksdb/utilities/replayer.h [new file with mode: 0644]
include/rocksdb/utilities/stackable_db.h
src.mk
table/block_based/block_based_table_reader.cc
table/table_test.cc
tools/block_cache_analyzer/block_cache_trace_analyzer.cc
tools/block_cache_analyzer/block_cache_trace_analyzer.h
tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
tools/db_bench_tool.cc
tools/trace_analyzer_tool.cc
tools/trace_analyzer_tool.h
trace_replay/block_cache_tracer.cc
trace_replay/block_cache_tracer_test.cc
trace_replay/io_tracer.h
trace_replay/io_tracer_test.cc
trace_replay/trace_record.cc [new file with mode: 0644]
trace_replay/trace_record_handler.cc [new file with mode: 0644]
trace_replay/trace_record_handler.h [new file with mode: 0644]
trace_replay/trace_replay.cc
trace_replay/trace_replay.h
utilities/simulator_cache/cache_simulator.cc
utilities/simulator_cache/cache_simulator_test.cc
utilities/trace/file_trace_reader_writer.cc
utilities/trace/file_trace_reader_writer.h
utilities/trace/replayer_impl.cc [new file with mode: 0644]
utilities/trace/replayer_impl.h [new file with mode: 0644]

index 43b3f62752760cbbf929775cff4ba84bb06d1283..a14d2dde7f663b5b771b729b2306e2a33db4e480 100644 (file)
@@ -816,9 +816,11 @@ set(SOURCES
         tools/ldb_tool.cc
         tools/sst_dump_tool.cc
         tools/trace_analyzer_tool.cc
-        trace_replay/trace_replay.cc
         trace_replay/block_cache_tracer.cc
         trace_replay/io_tracer.cc
+        trace_replay/trace_record_handler.cc
+        trace_replay/trace_record.cc
+        trace_replay/trace_replay.cc
         util/coding.cc
         util/compaction_job_stats_impl.cc
         util/comparator.cc
@@ -878,6 +880,7 @@ set(SOURCES
         utilities/simulator_cache/sim_cache.cc
         utilities/table_properties_collectors/compact_on_deletion_collector.cc
         utilities/trace/file_trace_reader_writer.cc
+        utilities/trace/replayer_impl.cc
         utilities/transactions/lock/lock_manager.cc
         utilities/transactions/lock/point/point_lock_tracker.cc
         utilities/transactions/lock/point/point_lock_manager.cc
index d7de634eb8dae3468f9509283ab619d12b968c7e..ea98e1a9498b31bf2c5e080b5248cc6c9ad2c85d 100644 (file)
 * BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions.
 * Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature.
 * Add a comment to suggest btrfs user to disable file preallocation by setting `options.allow_fallocate=false`.
+* Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench.
+
+## Public API change
+* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them.
 
 ### Performance Improvements
 * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.
diff --git a/TARGETS b/TARGETS
index d96845938a362a440c7e29759e2c90f19bbaaf73..0f50b4fa85b5b858bc705a3750a14faf6e45e62e 100644 (file)
--- a/TARGETS
+++ b/TARGETS
@@ -335,6 +335,8 @@ cpp_library(
         "tools/sst_dump_tool.cc",
         "trace_replay/block_cache_tracer.cc",
         "trace_replay/io_tracer.cc",
+        "trace_replay/trace_record.cc",
+        "trace_replay/trace_record_handler.cc",
         "trace_replay/trace_replay.cc",
         "util/build_version.cc",
         "util/coding.cc",
@@ -398,6 +400,7 @@ cpp_library(
         "utilities/simulator_cache/sim_cache.cc",
         "utilities/table_properties_collectors/compact_on_deletion_collector.cc",
         "utilities/trace/file_trace_reader_writer.cc",
+        "utilities/trace/replayer_impl.cc",
         "utilities/transactions/lock/lock_manager.cc",
         "utilities/transactions/lock/point/point_lock_manager.cc",
         "utilities/transactions/lock/point/point_lock_tracker.cc",
@@ -650,6 +653,8 @@ cpp_library(
         "tools/sst_dump_tool.cc",
         "trace_replay/block_cache_tracer.cc",
         "trace_replay/io_tracer.cc",
+        "trace_replay/trace_record.cc",
+        "trace_replay/trace_record_handler.cc",
         "trace_replay/trace_replay.cc",
         "util/build_version.cc",
         "util/coding.cc",
@@ -713,6 +718,7 @@ cpp_library(
         "utilities/simulator_cache/sim_cache.cc",
         "utilities/table_properties_collectors/compact_on_deletion_collector.cc",
         "utilities/trace/file_trace_reader_writer.cc",
+        "utilities/trace/replayer_impl.cc",
         "utilities/transactions/lock/lock_manager.cc",
         "utilities/transactions/lock/point/point_lock_manager.cc",
         "utilities/transactions/lock/point/point_lock_tracker.cc",
index e46092ba4d78aa3a9879587c0cb2ae8e9d6e548a..ef6a9cbda5761d0f78b4736358910843a5f6acd4 100644 (file)
@@ -94,6 +94,7 @@
 #include "table/table_builder.h"
 #include "table/two_level_iterator.h"
 #include "test_util/sync_point.h"
+#include "trace_replay/trace_replay.h"
 #include "util/autovector.h"
 #include "util/cast_util.h"
 #include "util/coding.h"
 #include "util/mutexlock.h"
 #include "util/stop_watch.h"
 #include "util/string_util.h"
+#include "utilities/trace/replayer_impl.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -4359,9 +4361,7 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
 
   return earliest_seq;
 }
-#endif  // ROCKSDB_LITE
 
-#ifndef ROCKSDB_LITE
 Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
                                        bool cache_only,
                                        SequenceNumber lower_bound_seq,
@@ -5108,6 +5108,14 @@ Status DBImpl::EndTrace() {
   return s;
 }
 
+Status DBImpl::NewDefaultReplayer(
+    const std::vector<ColumnFamilyHandle*>& handles,
+    std::unique_ptr<TraceReader>&& reader,
+    std::unique_ptr<Replayer>* replayer) {
+  replayer->reset(new ReplayerImpl(this, handles, std::move(reader)));
+  return Status::OK();
+}
+
 Status DBImpl::StartBlockCacheTrace(
     const TraceOptions& trace_options,
     std::unique_ptr<TraceWriter>&& trace_writer) {
index 0b65bd41da4176bbe04f666140b3ac407aa71bed..c4bb5b68b60e41d1dfaffbc6ab223851c13b9086 100644 (file)
 #include "rocksdb/env.h"
 #include "rocksdb/memtablerep.h"
 #include "rocksdb/status.h"
+#ifndef ROCKSDB_LITE
 #include "rocksdb/trace_reader_writer.h"
+#endif  // ROCKSDB_LITE
 #include "rocksdb/transaction_log.h"
+#ifndef ROCKSDB_LITE
+#include "rocksdb/utilities/replayer.h"
+#endif  // ROCKSDB_LITE
 #include "rocksdb/write_buffer_manager.h"
 #include "table/merging_iterator.h"
 #include "table/scoped_arena_iterator.h"
@@ -464,6 +469,12 @@ class DBImpl : public DB {
   using DB::EndTrace;
   virtual Status EndTrace() override;
 
+  using DB::NewDefaultReplayer;
+  virtual Status NewDefaultReplayer(
+      const std::vector<ColumnFamilyHandle*>& handles,
+      std::unique_ptr<TraceReader>&& reader,
+      std::unique_ptr<Replayer>* replayer) override;
+
   using DB::StartBlockCacheTrace;
   Status StartBlockCacheTrace(
       const TraceOptions& options,
index 56682dea56c8336651f1d00a7711d59b2ca989c3..d55f4a449fcd1ab989a6d1cfea17e5affcb28f04 100644 (file)
@@ -17,6 +17,7 @@
 #include "port/port.h"
 #include "port/stack_trace.h"
 #include "rocksdb/persistent_cache.h"
+#include "rocksdb/utilities/replayer.h"
 #include "rocksdb/wal_filter.h"
 #include "util/random.h"
 #include "utilities/fault_injection_env.h"
@@ -4256,8 +4257,160 @@ TEST_F(DBTest2, TraceAndReplay) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+  // Unprepared replay should fail with Status::Incomplete()
+  ASSERT_TRUE(replayer->Replay().IsIncomplete());
+  ASSERT_OK(replayer->Prepare());
+  // Ok to repeatedly Prepare().
+  ASSERT_OK(replayer->Prepare());
+  // Replay using 1 thread, 1x speed.
+  ASSERT_OK(replayer->Replay());
+
+  ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
+  ASSERT_EQ("1", value);
+  ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
+  ASSERT_EQ("12", value);
+  ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
+  ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
+
+  ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
+  ASSERT_EQ("bar", value);
+  ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
+  ASSERT_EQ("rocks", value);
+
+  // Re-replay should fail with Status::Incomplete() if Prepare() was not
+  // called. Currently we don't distinguish between unprepared and trace end.
+  ASSERT_TRUE(replayer->Replay().IsIncomplete());
+
+  // Re-replay using 2 threads, 2x speed.
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0)));
+
+  // Re-replay using 2 threads, 1/2 speed.
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5)));
+  replayer.reset();
+
+  for (auto handle : handles) {
+    delete handle;
+  }
+  delete db2;
+  ASSERT_OK(DestroyDB(dbname2, options));
+}
+
+TEST_F(DBTest2, TraceAndManualReplay) {
+  Options options = CurrentOptions();
+  options.merge_operator = MergeOperators::CreatePutOperator();
+  ReadOptions ro;
+  WriteOptions wo;
+  TraceOptions trace_opts;
+  EnvOptions env_opts;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  Random rnd(301);
+  Iterator* single_iter = nullptr;
+
+  ASSERT_TRUE(db_->EndTrace().IsIOError());
+
+  std::string trace_filename = dbname_ + "/rocksdb.trace";
+  std::unique_ptr<TraceWriter> trace_writer;
+  ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+  ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+
+  ASSERT_OK(Put(0, "a", "1"));
+  ASSERT_OK(Merge(0, "b", "2"));
+  ASSERT_OK(Delete(0, "c"));
+  ASSERT_OK(SingleDelete(0, "d"));
+  ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
+
+  WriteBatch batch;
+  ASSERT_OK(batch.Put("f", "11"));
+  ASSERT_OK(batch.Merge("g", "12"));
+  ASSERT_OK(batch.Delete("h"));
+  ASSERT_OK(batch.SingleDelete("i"));
+  ASSERT_OK(batch.DeleteRange("j", "k"));
+  ASSERT_OK(db_->Write(wo, &batch));
+
+  single_iter = db_->NewIterator(ro);
+  single_iter->Seek("f");
+  single_iter->SeekForPrev("g");
+  delete single_iter;
+
+  ASSERT_EQ("1", Get(0, "a"));
+  ASSERT_EQ("12", Get(0, "g"));
+
+  ASSERT_OK(Put(1, "foo", "bar"));
+  ASSERT_OK(Put(1, "rocksdb", "rocks"));
+  ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+
+  ASSERT_OK(db_->EndTrace());
+  // These should not get into the trace file as it is after EndTrace.
+  Put("hello", "world");
+  Merge("foo", "bar");
+
+  // Open another db, replay, and verify the data
+  std::string value;
+  std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay");
+  ASSERT_OK(DestroyDB(dbname2, options));
+
+  // Using a different name than db2, to pacify infer's use-after-lifetime
+  // warnings (http://fbinfer.com).
+  DB* db2_init = nullptr;
+  options.create_if_missing = true;
+  ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+  ColumnFamilyHandle* cf;
+  ASSERT_OK(
+      db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+  delete cf;
+  delete db2_init;
+
+  DB* db2 = nullptr;
+  std::vector<ColumnFamilyDescriptor> column_families;
+  ColumnFamilyOptions cf_options;
+  cf_options.merge_operator = MergeOperators::CreatePutOperator();
+  column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+  column_families.push_back(
+      ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+  std::vector<ColumnFamilyHandle*> handles;
+  DBOptions db_opts;
+  db_opts.env = env_;
+  ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
+
+  env_->SleepForMicroseconds(100);
+  // Verify that the keys don't already exist
+  ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+  ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
+
+  std::unique_ptr<TraceReader> trace_reader;
+  ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+  // Manual replay for 2 times. The 2nd checks if the replay can restart.
+  std::unique_ptr<TraceRecord> record;
+  for (int i = 0; i < 2; i++) {
+    // Next should fail if unprepared.
+    ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+    ASSERT_OK(replayer->Prepare());
+    Status s = Status::OK();
+    // Looping until trace end.
+    while (s.ok()) {
+      s = replayer->Next(&record);
+      // Skip unsupported operations.
+      if (s.IsNotSupported()) {
+        continue;
+      }
+      if (s.ok()) {
+        ASSERT_OK(replayer->Execute(std::move(record)));
+      }
+    }
+    // Status::Incomplete() will be returned when manually reading the trace
+    // end, or Prepare() was not called.
+    ASSERT_TRUE(s.IsIncomplete());
+    ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+  }
 
   ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
   ASSERT_EQ("1", value);
@@ -4271,6 +4424,85 @@ TEST_F(DBTest2, TraceAndReplay) {
   ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
   ASSERT_EQ("rocks", value);
 
+  // Test execution of artificially created TraceRecords.
+  uint64_t fake_ts = 1U;
+  // Write
+  batch.Clear();
+  batch.Put("trace-record-write1", "write1");
+  batch.Put("trace-record-write2", "write2");
+  record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
+  ASSERT_OK(replayer->Execute(std::move(record)));
+  ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
+  ASSERT_EQ("write1", value);
+  ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
+  ASSERT_EQ("write2", value);
+
+  // Get related
+  // Get an existing key.
+  record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
+                                       "trace-record-write1", fake_ts++));
+  ASSERT_OK(replayer->Execute(std::move(record)));
+  // Get an non-existing key, should still return Status::OK().
+  record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
+                                       fake_ts++));
+  ASSERT_OK(replayer->Execute(std::move(record)));
+  // Get from an invalid (non-existing) cf_id.
+  uint32_t invalid_cf_id = handles[1]->GetID() + 1;
+  record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
+  ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+
+  // Iteration related
+  for (IteratorSeekQueryTraceRecord::SeekType seekType :
+       {IteratorSeekQueryTraceRecord::kSeek,
+        IteratorSeekQueryTraceRecord::kSeekForPrev}) {
+    // Seek to an existing key.
+    record.reset(new IteratorSeekQueryTraceRecord(
+        seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
+    ASSERT_OK(replayer->Execute(std::move(record)));
+    // Seek to an non-existing key, should still return Status::OK().
+    record.reset(new IteratorSeekQueryTraceRecord(
+        seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
+    ASSERT_OK(replayer->Execute(std::move(record)));
+    // Seek from an invalid cf_id.
+    record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
+                                                  "whatever", fake_ts++));
+    ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+  }
+
+  // MultiGet related
+  // Get existing keys.
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"a", "foo"}), fake_ts++));
+  ASSERT_OK(replayer->Execute(std::move(record)));
+  // Get all non-existing keys, should still return Status::OK().
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"no1", "no2"}), fake_ts++));
+  // Get mixed of existing and non-existing keys, should still return
+  // Status::OK().
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"a", "no2"}), fake_ts++));
+  ASSERT_OK(replayer->Execute(std::move(record)));
+  // Get from an invalid (non-existing) cf_id.
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>(
+          {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
+      std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
+  ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+  // Empty MultiGet
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
+  ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+  // MultiGet size mismatch
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"a"}), fake_ts++));
+  ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+
+  replayer.reset();
+
   for (auto handle : handles) {
     delete handle;
   }
@@ -4334,8 +4566,12 @@ TEST_F(DBTest2, TraceWithLimit) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay());
+  replayer.reset();
 
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
   ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
@@ -4405,8 +4641,12 @@ TEST_F(DBTest2, TraceWithSampling) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay());
+  replayer.reset();
 
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
   ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
@@ -4505,8 +4745,12 @@ TEST_F(DBTest2, TraceWithFilter) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay());
+  replayer.reset();
 
   // All the key-values should not present since we filter out the WRITE ops.
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
index 9a85dd5e080df11bffb59a3ddb35fe541456dc48..733f45571395485561021467336170080b9487ad 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "rocksdb/file_system.h"
 #include "rocksdb/system_clock.h"
+#include "rocksdb/trace_record.h"
 
 namespace ROCKSDB_NAMESPACE {
 
index 645ecfb4519745f2754a1ea0f031381d9f532c93..1f715b3cfd9c4e121fd02b7de1636f964c5d63ff 100644 (file)
 
 namespace ROCKSDB_NAMESPACE {
 
-struct Options;
-struct DBOptions;
 struct ColumnFamilyOptions;
-struct ReadOptions;
-struct WriteOptions;
-struct FlushOptions;
 struct CompactionOptions;
 struct CompactRangeOptions;
-struct TableProperties;
+struct DBOptions;
 struct ExternalSstFileInfo;
-class WriteBatch;
+struct FlushOptions;
+struct Options;
+struct ReadOptions;
+struct TableProperties;
+struct WriteOptions;
+#ifdef ROCKSDB_LITE
+class CompactionJobInfo;
+#endif
 class Env;
 class EventListener;
+class FileSystem;
+#ifndef ROCKSDB_LITE
+class Replayer;
+#endif
 class StatsHistoryIterator;
+#ifndef ROCKSDB_LITE
+class TraceReader;
 class TraceWriter;
-#ifdef ROCKSDB_LITE
-class CompactionJobInfo;
 #endif
-class FileSystem;
+class WriteBatch;
 
 extern const std::string kDefaultColumnFamilyName;
 extern const std::string kPersistentStatsColumnFamilyName;
@@ -1628,6 +1634,7 @@ class DB {
   virtual ColumnFamilyHandle* DefaultColumnFamily() const = 0;
 
 #ifndef ROCKSDB_LITE
+
   virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
                                           TablePropertiesCollection* props) = 0;
   virtual Status GetPropertiesOfAllTables(TablePropertiesCollection* props) {
@@ -1678,6 +1685,15 @@ class DB {
   virtual Status EndBlockCacheTrace() {
     return Status::NotSupported("EndBlockCacheTrace() is not implemented.");
   }
+
+  // Create a default trace replayer.
+  virtual Status NewDefaultReplayer(
+      const std::vector<ColumnFamilyHandle*>& /*handles*/,
+      std::unique_ptr<TraceReader>&& /*reader*/,
+      std::unique_ptr<Replayer>* /*replayer*/) {
+    return Status::NotSupported("NewDefaultReplayer() is not implemented.");
+  }
+
 #endif  // ROCKSDB_LITE
 
   // Needed for StackableDB
index 26ceab2c84794c60b016d4e755aa74719b6a6c46..50c72f7bbc313575b9d4705aa2725646bae69421 100644 (file)
@@ -36,6 +36,11 @@ class TraceReader {
 
   virtual Status Read(std::string* data) = 0;
   virtual Status Close() = 0;
+
+  // Seek back to the trace header. Replayer can call this method for
+  // repeatedly replaying. Note this method may fail if the reader is already
+  // closed.
+  virtual Status Reset() = 0;
 };
 
 // Factory methods to read/write traces from/to a file.
@@ -45,4 +50,5 @@ Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
 Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
                           const std::string& trace_filename,
                           std::unique_ptr<TraceReader>* trace_reader);
+
 }  // namespace ROCKSDB_NAMESPACE
diff --git a/include/rocksdb/trace_record.h b/include/rocksdb/trace_record.h
new file mode 100644 (file)
index 0000000..add235e
--- /dev/null
@@ -0,0 +1,205 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/slice.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ColumnFamilyHandle;
+class DB;
+class Status;
+
+// Supported trace record types.
+enum TraceType : char {
+  kTraceNone = 0,
+  kTraceBegin = 1,
+  kTraceEnd = 2,
+  // Query level tracing related trace types.
+  kTraceWrite = 3,
+  kTraceGet = 4,
+  kTraceIteratorSeek = 5,
+  kTraceIteratorSeekForPrev = 6,
+  // Block cache tracing related trace types.
+  kBlockTraceIndexBlock = 7,
+  kBlockTraceFilterBlock = 8,
+  kBlockTraceDataBlock = 9,
+  kBlockTraceUncompressionDictBlock = 10,
+  kBlockTraceRangeDeletionBlock = 11,
+  // IO tracing related trace type.
+  kIOTracer = 12,
+  // Query level tracing related trace type.
+  kTraceMultiGet = 13,
+  // All trace types should be added before kTraceMax
+  kTraceMax,
+};
+
+class WriteQueryTraceRecord;
+class GetQueryTraceRecord;
+class IteratorSeekQueryTraceRecord;
+class MultiGetQueryTraceRecord;
+
+// Base class for all types of trace records.
+class TraceRecord {
+ public:
+  TraceRecord();
+  explicit TraceRecord(uint64_t timestamp);
+  virtual ~TraceRecord();
+
+  virtual TraceType GetTraceType() const = 0;
+
+  virtual uint64_t GetTimestamp() const;
+
+  class Handler {
+   public:
+    virtual ~Handler() {}
+
+    virtual Status Handle(const WriteQueryTraceRecord& record) = 0;
+    virtual Status Handle(const GetQueryTraceRecord& record) = 0;
+    virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0;
+    virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0;
+  };
+
+  virtual Status Accept(Handler* handler) = 0;
+
+  // Create a handler for the exeution of TraceRecord.
+  static Handler* NewExecutionHandler(
+      DB* db, const std::vector<ColumnFamilyHandle*>& handles);
+
+ private:
+  // Timestamp (in microseconds) of this trace.
+  uint64_t timestamp_;
+};
+
+// Base class for all query types of trace records.
+class QueryTraceRecord : public TraceRecord {
+ public:
+  explicit QueryTraceRecord(uint64_t timestamp);
+
+  virtual ~QueryTraceRecord() override;
+};
+
+// Trace record for DB::Write() operation.
+class WriteQueryTraceRecord : public QueryTraceRecord {
+ public:
+  WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, uint64_t timestamp);
+
+  WriteQueryTraceRecord(const std::string& write_batch_rep, uint64_t timestamp);
+
+  virtual ~WriteQueryTraceRecord() override;
+
+  TraceType GetTraceType() const override { return kTraceWrite; };
+
+  virtual Slice GetWriteBatchRep() const;
+
+  virtual Status Accept(Handler* handler) override;
+
+ private:
+  PinnableSlice rep_;
+};
+
+// Trace record for DB::Get() operation
+class GetQueryTraceRecord : public QueryTraceRecord {
+ public:
+  GetQueryTraceRecord(uint32_t column_family_id, PinnableSlice&& key,
+                      uint64_t timestamp);
+
+  GetQueryTraceRecord(uint32_t column_family_id, const std::string& key,
+                      uint64_t timestamp);
+
+  virtual ~GetQueryTraceRecord() override;
+
+  TraceType GetTraceType() const override { return kTraceGet; };
+
+  virtual uint32_t GetColumnFamilyID() const;
+
+  virtual Slice GetKey() const;
+
+  virtual Status Accept(Handler* handler) override;
+
+ private:
+  // Column family ID.
+  uint32_t cf_id_;
+  // Key to get.
+  PinnableSlice key_;
+};
+
+// Base class for all Iterator related operations.
+class IteratorQueryTraceRecord : public QueryTraceRecord {
+ public:
+  explicit IteratorQueryTraceRecord(uint64_t timestamp);
+
+  virtual ~IteratorQueryTraceRecord() override;
+};
+
+// Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation.
+class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord {
+ public:
+  // Currently we only support Seek() and SeekForPrev().
+  enum SeekType {
+    kSeek = kTraceIteratorSeek,
+    kSeekForPrev = kTraceIteratorSeekForPrev
+  };
+
+  IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
+                               PinnableSlice&& key, uint64_t timestamp);
+
+  IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
+                               const std::string& key, uint64_t timestamp);
+
+  virtual ~IteratorSeekQueryTraceRecord() override;
+
+  TraceType GetTraceType() const override;
+
+  virtual SeekType GetSeekType() const;
+
+  virtual uint32_t GetColumnFamilyID() const;
+
+  virtual Slice GetKey() const;
+
+  virtual Status Accept(Handler* handler) override;
+
+ private:
+  SeekType type_;
+  // Column family ID.
+  uint32_t cf_id_;
+  // Key to seek to.
+  PinnableSlice key_;
+};
+
+// Trace record for DB::MultiGet() operation.
+class MultiGetQueryTraceRecord : public QueryTraceRecord {
+ public:
+  MultiGetQueryTraceRecord(std::vector<uint32_t> column_family_ids,
+                           std::vector<PinnableSlice>&& keys,
+                           uint64_t timestamp);
+
+  MultiGetQueryTraceRecord(std::vector<uint32_t> column_family_ids,
+                           const std::vector<std::string>& keys,
+                           uint64_t timestamp);
+
+  virtual ~MultiGetQueryTraceRecord() override;
+
+  TraceType GetTraceType() const override { return kTraceMultiGet; };
+
+  virtual std::vector<uint32_t> GetColumnFamilyIDs() const;
+
+  virtual std::vector<Slice> GetKeys() const;
+
+  virtual Status Accept(Handler* handler) override;
+
+ private:
+  // Column familiy IDs.
+  std::vector<uint32_t> cf_ids_;
+  // Keys to get.
+  std::vector<PinnableSlice> keys_;
+};
+
+}  // namespace ROCKSDB_NAMESPACE
diff --git a/include/rocksdb/utilities/replayer.h b/include/rocksdb/utilities/replayer.h
new file mode 100644 (file)
index 0000000..976fadb
--- /dev/null
@@ -0,0 +1,74 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <memory>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+struct ReplayOptions {
+  // Number of threads used for replaying. If 0 or 1, replay using
+  // single thread.
+  uint32_t num_threads;
+
+  // Enables fast forwarding a replay by increasing/reducing the delay between
+  // the ingested traces.
+  //   If > 0.0 and < 1.0, slow down the replay by this amount.
+  //   If 1.0, replay the operations at the same rate as in the trace stream.
+  //   If > 1, speed up the replay by this amount.
+  double fast_forward;
+
+  ReplayOptions() : num_threads(1), fast_forward(1.0) {}
+  ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio)
+      : num_threads(num_of_threads), fast_forward(fast_forward_ratio) {}
+};
+
+// Replayer helps to replay the captured RocksDB query level operations.
+// The Replayer can either be created from DB::NewReplayer method, or be
+// instantiated via db_bench today, on using "replay" benchmark.
+class Replayer {
+ public:
+  virtual ~Replayer() {}
+
+  // Make some preparation before replaying the trace. This will also reset the
+  // replayer in order to restart replaying.
+  virtual Status Prepare() = 0;
+
+  // Return the timestamp when the trace recording was started.
+  virtual uint64_t GetHeaderTimestamp() const = 0;
+
+  // Atomically read one trace into a TraceRecord (excluding the header and
+  // footer traces).
+  // Return Status::OK() on success;
+  // Status::Incomplete() if Prepare() was not called or no more available
+  // trace;
+  // Status::NotSupported() if the read trace type is not supported.
+  virtual Status Next(std::unique_ptr<TraceRecord>* record) = 0;
+
+  // Execute one TraceRecord.
+  // Return Status::OK() if the execution was successful. Get/MultiGet traces
+  // will still return Status::OK() even if they got Status::NotFound()
+  // from DB::Get() or DB::MultiGet();
+  // Status::Incomplete() if Prepare() was not called or no more available
+  // trace;
+  // Status::NotSupported() if the operation is not supported;
+  // Otherwise, return the corresponding error status.
+  virtual Status Execute(const std::unique_ptr<TraceRecord>& record) = 0;
+  virtual Status Execute(std::unique_ptr<TraceRecord>&& record) = 0;
+
+  // Replay all the traces from the provided trace stream, taking the delay
+  // between the traces into consideration.
+  virtual Status Replay(const ReplayOptions& options) = 0;
+  virtual Status Replay() { return Replay(ReplayOptions()); }
+};
+
+}  // namespace ROCKSDB_NAMESPACE
+#endif  // ROCKSDB_LITE
index f798652b4f92ba8c6a51ae0ba07f892ba5c3c883..5d3eea6a746fea62f2f614d862787486258bdf7d 100644 (file)
@@ -390,6 +390,13 @@ class StackableDB : public DB {
   using DB::EndTrace;
   Status EndTrace() override { return db_->EndTrace(); }
 
+  using DB::NewDefaultReplayer;
+  Status NewDefaultReplayer(const std::vector<ColumnFamilyHandle*>& handles,
+                            std::unique_ptr<TraceReader>&& reader,
+                            std::unique_ptr<Replayer>* replayer) override {
+    return db_->NewDefaultReplayer(handles, std::move(reader), replayer);
+  }
+
 #endif  // ROCKSDB_LITE
 
   virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
diff --git a/src.mk b/src.mk
index 05318171519e2f0cc019c4c12cbbf3f562fd38ec..b2c83f048e3ca3f59c14c6de5535e8b2324c756b 100644 (file)
--- a/src.mk
+++ b/src.mk
@@ -198,6 +198,8 @@ LIB_SOURCES =                                                   \
   test_util/sync_point_impl.cc                                  \
   test_util/transaction_test_util.cc                            \
   tools/dump/db_dump_tool.cc                                    \
+  trace_replay/trace_record_handler.cc                          \
+  trace_replay/trace_record.cc                                  \
   trace_replay/trace_replay.cc                                  \
   trace_replay/block_cache_tracer.cc                            \
   trace_replay/io_tracer.cc                                     \
@@ -262,6 +264,7 @@ LIB_SOURCES =                                                   \
   utilities/simulator_cache/sim_cache.cc                        \
   utilities/table_properties_collectors/compact_on_deletion_collector.cc \
   utilities/trace/file_trace_reader_writer.cc                   \
+  utilities/trace/replayer_impl.cc                              \
   utilities/transactions/lock/lock_manager.cc                   \
   utilities/transactions/lock/point/point_lock_tracker.cc       \
   utilities/transactions/lock/point/point_lock_manager.cc       \
index d5b1d5e835d39d564d3284606081cab2ab5e77c4..f6eca75e3e3406d7fe63c608f533efe4f5de5330 100644 (file)
@@ -36,6 +36,7 @@
 #include "rocksdb/system_clock.h"
 #include "rocksdb/table.h"
 #include "rocksdb/table_properties.h"
+#include "rocksdb/trace_record.h"
 #include "table/block_based/binary_search_index_reader.h"
 #include "table/block_based/block.h"
 #include "table/block_based/block_based_filter_block.h"
index ac0d6b930ba175d24470bea1bcdae9585ae4e7d9..f805418af72a5483197b024df7b612af13328dc7 100644 (file)
@@ -8,6 +8,7 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include <stdio.h>
+
 #include <algorithm>
 #include <iostream>
 #include <map>
@@ -34,6 +35,7 @@
 #include "rocksdb/perf_context.h"
 #include "rocksdb/slice_transform.h"
 #include "rocksdb/statistics.h"
+#include "rocksdb/trace_record.h"
 #include "rocksdb/write_buffer_manager.h"
 #include "table/block_based/block.h"
 #include "table/block_based/block_based_table_builder.h"
index 29ec8cb91ba5c9c4374915a7a0cc4118bb7a827d..ff618f4b522f1a3bbdd21cab32678a82b1c895ab 100644 (file)
@@ -20,6 +20,7 @@
 
 #include "monitoring/histogram.h"
 #include "rocksdb/system_clock.h"
+#include "rocksdb/trace_record.h"
 #include "util/gflags_compat.h"
 #include "util/string_util.h"
 
index 4436e0b77a17630a0b29ea28d8d0729e32713e63..8afdf54490749cdfe7518fa5b5dd8a31edc3725f 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "db/dbformat.h"
 #include "rocksdb/env.h"
+#include "rocksdb/trace_record.h"
 #include "rocksdb/utilities/sim_cache.h"
 #include "trace_replay/block_cache_tracer.h"
 #include "utilities/simulator_cache/cache_simulator.h"
index 91bd30652f79add72d29cc72d35e14df6db6242a..ee65c60767d5a89a556c5edec26558fd46937db6 100644 (file)
@@ -21,6 +21,7 @@ int main() {
 #include "rocksdb/env.h"
 #include "rocksdb/status.h"
 #include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 #include "tools/block_cache_analyzer/block_cache_trace_analyzer.h"
index c7c254735f9c29cc406ab7877dbc5533fd6da6e1..89d75053624606349091ac73cb5f59e89e358f3b 100644 (file)
@@ -63,6 +63,9 @@
 #include "rocksdb/utilities/optimistic_transaction_db.h"
 #include "rocksdb/utilities/options_type.h"
 #include "rocksdb/utilities/options_util.h"
+#ifndef ROCKSDB_LITE
+#include "rocksdb/utilities/replayer.h"
+#endif  // ROCKSDB_LITE
 #include "rocksdb/utilities/sim_cache.h"
 #include "rocksdb/utilities/transaction.h"
 #include "rocksdb/utilities/transaction_db.h"
@@ -228,7 +231,9 @@ IF_ROCKSDB_LITE("",
     "\tmemstats  -- Print memtable stats\n"
     "\tsstables    -- Print sstable info\n"
     "\theapprofile -- Dump a heap profile (if supported by this port)\n"
+#ifndef ROCKSDB_LITE
     "\treplay      -- replay the trace file specified with trace_file\n"
+#endif  // ROCKSDB_LITE
     "\tgetmergeoperands -- Insert lots of merge records which are a list of "
     "sorted ints for a key and then compare performance of lookup for another "
     "key "
@@ -997,10 +1002,12 @@ DEFINE_bool(report_bg_io_stats, false,
 DEFINE_bool(use_stderr_info_logger, false,
             "Write info logs to stderr instead of to LOG file. ");
 
+#ifndef ROCKSDB_LITE
+
 DEFINE_string(trace_file, "", "Trace workload to a file. ");
 
-DEFINE_int32(trace_replay_fast_forward, 1,
-             "Fast forward trace replay, must >= 1. ");
+DEFINE_double(trace_replay_fast_forward, 1.0,
+              "Fast forward trace replay, must > 0.0.");
 DEFINE_int32(block_cache_trace_sampling_frequency, 1,
              "Block cache trace sampling frequency, termed s. It uses spatial "
              "downsampling and samples accesses to one out of s blocks.");
@@ -1014,6 +1021,8 @@ DEFINE_string(block_cache_trace_file, "", "Block cache trace file path.");
 DEFINE_int32(trace_replay_threads, 1,
              "The number of threads to replay, must >=1.");
 
+#endif  // ROCKSDB_LITE
+
 static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
     const char* ctype) {
   assert(ctype);
@@ -3468,6 +3477,7 @@ class Benchmark {
         PrintStats("rocksdb.sstables");
       } else if (name == "stats_history") {
         PrintStatsHistory();
+#ifndef ROCKSDB_LITE
       } else if (name == "replay") {
         if (num_threads > 1) {
           fprintf(stderr, "Multi-threaded replay is not yet supported\n");
@@ -3478,6 +3488,7 @@ class Benchmark {
           ErrorExit();
         }
         method = &Benchmark::Replay;
+#endif  // ROCKSDB_LITE
       } else if (name == "getmergeoperands") {
         method = &Benchmark::GetMergeOperands;
       } else if (!name.empty()) {  // No error message for empty name
@@ -7978,6 +7989,8 @@ class Benchmark {
     }
   }
 
+#ifndef ROCKSDB_LITE
+
   void Replay(ThreadState* thread) {
     if (db_.db != nullptr) {
       Replay(thread, &db_);
@@ -7997,20 +8010,34 @@ class Benchmark {
           s.ToString().c_str());
       exit(1);
     }
-    Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
-                      std::move(trace_reader));
-    replayer.SetFastForward(
-        static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
-    s = replayer.MultiThreadReplay(
-        static_cast<uint32_t>(FLAGS_trace_replay_threads));
+    std::unique_ptr<Replayer> replayer;
+    s = db_with_cfh->db->NewDefaultReplayer(db_with_cfh->cfh,
+                                            std::move(trace_reader), &replayer);
+    if (!s.ok()) {
+      fprintf(stderr,
+              "Encountered an error creating a default Replayer. "
+              "Error: %s\n",
+              s.ToString().c_str());
+      exit(1);
+    }
+    s = replayer->Prepare();
+    if (!s.ok()) {
+      fprintf(stderr, "Prepare for replay failed. Error: %s\n",
+              s.ToString().c_str());
+    }
+    s = replayer->Replay(
+        ReplayOptions(static_cast<uint32_t>(FLAGS_trace_replay_threads),
+                      FLAGS_trace_replay_fast_forward));
+    replayer.reset();
     if (s.ok()) {
-      fprintf(stdout, "Replay started from trace_file: %s\n",
+      fprintf(stdout, "Replay completed from trace_file: %s\n",
               FLAGS_trace_file.c_str());
     } else {
-      fprintf(stderr, "Starting replay failed. Error: %s\n",
-              s.ToString().c_str());
+      fprintf(stderr, "Replay failed. Error: %s\n", s.ToString().c_str());
     }
   }
+
+#endif  // ROCKSDB_LITE
 };
 
 int db_bench_tool(int argc, char** argv) {
index 6dd0a423acf00056df21d23a4d6fe9891537b17d..973b3d6bdbc1061d93b4331cf67fe62ef2236a13 100644 (file)
@@ -195,12 +195,6 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
   return (op1 * op2);
 }
 
-void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) {
-  Slice buf(buffer);
-  GetFixed32(&buf, cf_id);
-  GetLengthPrefixedSlice(&buf, key);
-}
-
 }  // namespace
 
 // The default constructor of AnalyzerOptions
@@ -477,74 +471,89 @@ Status TraceAnalyzer::StartProcessing() {
 
     total_requests_++;
     end_time_ = trace.ts;
-    if (trace.type == kTraceWrite) {
-      total_writes_++;
-      c_time_ = trace.ts;
-      Slice batch_data;
-      if (trace_file_version_ < 2) {
-        Slice tmp_data(trace.payload);
-        batch_data = tmp_data;
-      } else {
-        WritePayload w_payload;
-        TracerHelper::DecodeWritePayload(&trace, &w_payload);
-        batch_data = w_payload.write_batch_data;
-      }
-      // Note that, if the write happens in a transaction,
-      // 'Write' will be called twice, one for Prepare, one for
-      // Commit. Thus, in the trace, for the same WriteBatch, there
-      // will be two reords if it is in a transaction. Here, we only
-      // process the reord that is committed. If write is non-transaction,
-      // HasBeginPrepare()==false, so we process it normally.
-      WriteBatch batch(batch_data.ToString());
-      if (batch.HasBeginPrepare() && !batch.HasCommit()) {
-        continue;
-      }
-      TraceWriteHandler write_handler(this);
-      s = batch.Iterate(&write_handler);
-      if (!s.ok()) {
-        fprintf(stderr, "Cannot process the write batch in the trace\n");
-        return s;
+    if (trace.type == kTraceEnd) {
+      break;
+    }
+
+    std::unique_ptr<TraceRecord> record;
+    switch (trace.type) {
+      case kTraceWrite: {
+        s = TracerHelper::DecodeWriteRecord(&trace, trace_file_version_,
+                                            &record);
+        if (!s.ok()) {
+          return s;
+        }
+        total_writes_++;
+        c_time_ = trace.ts;
+        std::unique_ptr<WriteQueryTraceRecord> r(
+            reinterpret_cast<WriteQueryTraceRecord*>(record.release()));
+        // Note that, if the write happens in a transaction,
+        // 'Write' will be called twice, one for Prepare, one for
+        // Commit. Thus, in the trace, for the same WriteBatch, there
+        // will be two reords if it is in a transaction. Here, we only
+        // process the reord that is committed. If write is non-transaction,
+        // HasBeginPrepare()==false, so we process it normally.
+        WriteBatch batch(r->GetWriteBatchRep().ToString());
+        if (batch.HasBeginPrepare() && !batch.HasCommit()) {
+          continue;
+        }
+        TraceWriteHandler write_handler(this);
+        s = batch.Iterate(&write_handler);
+        if (!s.ok()) {
+          fprintf(stderr, "Cannot process the write batch in the trace\n");
+          return s;
+        }
+        break;
       }
-    } else if (trace.type == kTraceGet) {
-      GetPayload get_payload;
-      get_payload.get_key = 0;
-      if (trace_file_version_ < 2) {
-        DecodeCFAndKeyFromString(trace.payload, &get_payload.cf_id,
-                                 &get_payload.get_key);
-      } else {
-        TracerHelper::DecodeGetPayload(&trace, &get_payload);
+      case kTraceGet: {
+        s = TracerHelper::DecodeGetRecord(&trace, trace_file_version_, &record);
+        if (!s.ok()) {
+          return s;
+        }
+        total_gets_++;
+        std::unique_ptr<GetQueryTraceRecord> r(
+            reinterpret_cast<GetQueryTraceRecord*>(record.release()));
+        s = HandleGet(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
+                      1);
+        if (!s.ok()) {
+          fprintf(stderr, "Cannot process the get in the trace\n");
+          return s;
+        }
+        break;
       }
-      total_gets_++;
-
-      s = HandleGet(get_payload.cf_id, get_payload.get_key.ToString(), trace.ts,
-                    1);
-      if (!s.ok()) {
-        fprintf(stderr, "Cannot process the get in the trace\n");
-        return s;
+      case kTraceIteratorSeek:
+      case kTraceIteratorSeekForPrev: {
+        s = TracerHelper::DecodeIterRecord(&trace, trace_file_version_,
+                                           &record);
+        if (!s.ok()) {
+          return s;
+        }
+        std::unique_ptr<IteratorSeekQueryTraceRecord> r(
+            reinterpret_cast<IteratorSeekQueryTraceRecord*>(record.release()));
+        s = HandleIter(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
+                       r->GetTraceType());
+        if (!s.ok()) {
+          fprintf(stderr, "Cannot process the iterator in the trace\n");
+          return s;
+        }
+        break;
       }
-    } else if (trace.type == kTraceIteratorSeek ||
-               trace.type == kTraceIteratorSeekForPrev) {
-      IterPayload iter_payload;
-      iter_payload.cf_id = 0;
-      if (trace_file_version_ < 2) {
-        DecodeCFAndKeyFromString(trace.payload, &iter_payload.cf_id,
-                                 &iter_payload.iter_key);
-      } else {
-        TracerHelper::DecodeIterPayload(&trace, &iter_payload);
+      case kTraceMultiGet: {
+        s = TracerHelper::DecodeMultiGetRecord(&trace, trace_file_version_,
+                                               &record);
+        if (!s.ok()) {
+          return s;
+        }
+        std::unique_ptr<MultiGetQueryTraceRecord> r(
+            reinterpret_cast<MultiGetQueryTraceRecord*>(record.release()));
+        s = HandleMultiGet(r->GetColumnFamilyIDs(), r->GetKeys(),
+                           r->GetTimestamp());
+        break;
       }
-      s = HandleIter(iter_payload.cf_id, iter_payload.iter_key.ToString(),
-                     trace.ts, trace.type);
-      if (!s.ok()) {
-        fprintf(stderr, "Cannot process the iterator in the trace\n");
-        return s;
+      default: {
+        // Skip unsupported types
+        break;
       }
-    } else if (trace.type == kTraceMultiGet) {
-      MultiGetPayload multiget_payload;
-      assert(trace_file_version_ >= 2);
-      TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
-      s = HandleMultiGet(multiget_payload, trace.ts);
-    } else if (trace.type == kTraceEnd) {
-      break;
     }
   }
   if (s.IsIncomplete()) {
@@ -825,7 +834,7 @@ Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats,
 
 // Process the statistics of QPS
 Status TraceAnalyzer::MakeStatisticQPS() {
-  if(begin_time_ == 0) {
+  if (begin_time_ == 0) {
     begin_time_ = trace_create_time_;
   }
   uint32_t duration =
@@ -1547,9 +1556,8 @@ Status TraceAnalyzer::CloseOutputFiles() {
 }
 
 // Handle the Get request in the trace
-Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
-                                const std::string& key, const uint64_t& ts,
-                                const uint32_t& get_ret) {
+Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key,
+                                const uint64_t& ts, const uint32_t& get_ret) {
   Status s;
   size_t value_size = 0;
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
@@ -1575,8 +1583,8 @@ Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
   if (get_ret == 1) {
     value_size = 10;
   }
-  s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key,
-                        value_size, ts);
+  s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id,
+                        key.ToString(), value_size, ts);
   if (!s.ok()) {
     return Status::Corruption("Failed to insert key statistics");
   }
@@ -1752,9 +1760,8 @@ Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
 }
 
 // Handle the Iterator request in the trace
-Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
-                                 const std::string& key, const uint64_t& ts,
-                                 TraceType& trace_type) {
+Status TraceAnalyzer::HandleIter(uint32_t column_family_id, const Slice& key,
+                                 const uint64_t& ts, TraceType trace_type) {
   Status s;
   size_t value_size = 0;
   int type = -1;
@@ -1788,7 +1795,7 @@ Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
   if (!ta_[type].enabled) {
     return Status::OK();
   }
-  s = KeyStatsInsertion(type, column_family_id, key, value_size, ts);
+  s = KeyStatsInsertion(type, column_family_id, key.ToString(), value_size, ts);
   if (!s.ok()) {
     return Status::Corruption("Failed to insert key statistics");
   }
@@ -1796,24 +1803,22 @@ Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
 }
 
 // Handle MultiGet queries in the trace
-Status TraceAnalyzer::HandleMultiGet(MultiGetPayload& multiget_payload,
-                                     const uint64_t& ts) {
+Status TraceAnalyzer::HandleMultiGet(
+    const std::vector<uint32_t>& column_family_ids,
+    const std::vector<Slice>& keys, const uint64_t& ts) {
   Status s;
   size_t value_size = 0;
-  if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
+  if (column_family_ids.size() != keys.size()) {
     // The size does not match is not the error of tracing and anayzing, we just
     // report it to the user. The analyzing continues.
     printf("The CF ID vector size does not match the keys vector size!\n");
   }
-  size_t vector_size = std::min(multiget_payload.cf_ids.size(),
-                                multiget_payload.multiget_keys.size());
+  size_t vector_size = std::min(column_family_ids.size(), keys.size());
   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
     for (size_t i = 0; i < vector_size; i++) {
-      assert(i < multiget_payload.cf_ids.size() &&
-             i < multiget_payload.multiget_keys.size());
+      assert(i < column_family_ids.size() && i < keys.size());
       s = WriteTraceSequence(TraceOperationType::kMultiGet,
-                             multiget_payload.cf_ids[i],
-                             multiget_payload.multiget_keys[i], value_size, ts);
+                             column_family_ids[i], keys[i], value_size, ts);
     }
     if (!s.ok()) {
       return Status::Corruption("Failed to write the trace sequence to file");
@@ -1833,11 +1838,9 @@ Status TraceAnalyzer::HandleMultiGet(MultiGetPayload& multiget_payload,
     return Status::OK();
   }
   for (size_t i = 0; i < vector_size; i++) {
-    assert(i < multiget_payload.cf_ids.size() &&
-           i < multiget_payload.multiget_keys.size());
-    s = KeyStatsInsertion(TraceOperationType::kMultiGet,
-                          multiget_payload.cf_ids[i],
-                          multiget_payload.multiget_keys[i], value_size, ts);
+    assert(i < column_family_ids.size() && i < keys.size());
+    s = KeyStatsInsertion(TraceOperationType::kMultiGet, column_family_ids[i],
+                          keys[i].ToString(), value_size, ts);
   }
   if (!s.ok()) {
     return Status::Corruption("Failed to insert key statistics");
@@ -2011,10 +2014,11 @@ void TraceAnalyzer::PrintStatistics() {
 // Write the trace sequence to file
 Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
                                          const uint32_t& cf_id,
-                                         const std::string& key,
+                                         const Slice& key,
                                          const size_t value_size,
                                          const uint64_t ts) {
-  std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key);
+  std::string hex_key =
+      ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key.ToString());
   int ret;
   ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type,
                  cf_id, value_size, ts);
index bc1b8948266f50ed30d0504b8ceb529f8eefb41b..7eafd2a3c8dddfc1d4910d381e9f6c90fd9e48a7 100644 (file)
@@ -15,6 +15,7 @@
 
 #include "rocksdb/env.h"
 #include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
 #include "rocksdb/write_batch.h"
 #include "trace_replay/trace_replay.h"
 
@@ -182,7 +183,7 @@ class TraceAnalyzer {
   Status WriteTraceUnit(TraceUnit& unit);
 
   // The trace  processing functions for different type
-  Status HandleGet(uint32_t column_family_id, const std::string& key,
+  Status HandleGet(uint32_t column_family_id, const Slice& key,
                    const uint64_t& ts, const uint32_t& get_ret);
   Status HandlePut(uint32_t column_family_id, const Slice& key,
                    const Slice& value);
@@ -192,9 +193,10 @@ class TraceAnalyzer {
                            const Slice& end_key);
   Status HandleMerge(uint32_t column_family_id, const Slice& key,
                      const Slice& value);
-  Status HandleIter(uint32_t column_family_id, const std::string& key,
-                    const uint64_t& ts, TraceType& trace_type);
-  Status HandleMultiGet(MultiGetPayload& multiget_payload, const uint64_t& ts);
+  Status HandleIter(uint32_t column_family_id, const Slice& key,
+                    const uint64_t& ts, TraceType trace_type);
+  Status HandleMultiGet(const std::vector<uint32_t>& column_family_ids,
+                        const std::vector<Slice>& keys, const uint64_t& ts);
   std::vector<TypeUnit>& GetTaVector() { return ta_; }
 
  private:
@@ -246,7 +248,7 @@ class TraceAnalyzer {
   Status TraceUnitWriter(
       std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
   Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
-                            const std::string& key, const size_t value_size,
+                            const Slice& key, const size_t value_size,
                             const uint64_t ts);
   Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
   Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
index b9c7477fd0014aa957616b32832b1b591c4a2d74..4bd18cda714fa926729d07ab9ed0d0bd61f1dbd3 100644 (file)
@@ -12,6 +12,7 @@
 #include "db/db_impl/db_impl.h"
 #include "db/dbformat.h"
 #include "rocksdb/slice.h"
+#include "rocksdb/trace_record.h"
 #include "util/coding.h"
 #include "util/hash.h"
 #include "util/string_util.h"
index 01b834ed0235f1aaaf2aa76c7786fb4315840fea..308de8e36ad5637b346ceac432bc4d49efcdfb20 100644 (file)
@@ -4,9 +4,11 @@
 //  (found in the LICENSE.Apache file in the root directory).
 
 #include "trace_replay/block_cache_tracer.h"
+
 #include "rocksdb/env.h"
 #include "rocksdb/status.h"
 #include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 
index 8db6e2a274b8615772d1087ee84b644b57b6381e..3fc7cdba0a69424cafc600e850e1e4c110f1c81d 100644 (file)
@@ -12,6 +12,7 @@
 #include "port/lang.h"
 #include "rocksdb/file_system.h"
 #include "rocksdb/options.h"
+#include "rocksdb/trace_record.h"
 #include "trace_replay/trace_replay.h"
 
 namespace ROCKSDB_NAMESPACE {
index cc27bb970381b2d7fd56bc70ba1a9bc11efd41cb..3471c9c43e69ea332b20b4d5ee53ffd015f0e0fb 100644 (file)
@@ -8,6 +8,7 @@
 #include "rocksdb/env.h"
 #include "rocksdb/status.h"
 #include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 
diff --git a/trace_replay/trace_record.cc b/trace_replay/trace_record.cc
new file mode 100644 (file)
index 0000000..75afcf3
--- /dev/null
@@ -0,0 +1,163 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/trace_record.h"
+
+#include <utility>
+
+#include "rocksdb/db.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/status.h"
+#include "trace_replay/trace_record_handler.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceRecord
+TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {}
+
+TraceRecord::~TraceRecord() {}
+
+uint64_t TraceRecord::GetTimestamp() const { return timestamp_; }
+
+TraceRecord::Handler* TraceRecord::NewExecutionHandler(
+    DB* db, const std::vector<ColumnFamilyHandle*>& handles) {
+  return new TraceExecutionHandler(db, handles);
+}
+
+// QueryTraceRecord
+QueryTraceRecord::QueryTraceRecord(uint64_t timestamp)
+    : TraceRecord(timestamp) {}
+
+QueryTraceRecord::~QueryTraceRecord() {}
+
+// WriteQueryTraceRecord
+WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep,
+                                             uint64_t timestamp)
+    : QueryTraceRecord(timestamp), rep_(std::move(write_batch_rep)) {}
+
+WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep,
+                                             uint64_t timestamp)
+    : QueryTraceRecord(timestamp) {
+  rep_.PinSelf(write_batch_rep);
+}
+
+WriteQueryTraceRecord::~WriteQueryTraceRecord() {}
+
+Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); }
+
+Status WriteQueryTraceRecord::Accept(Handler* handler) {
+  assert(handler != nullptr);
+  return handler->Handle(*this);
+}
+
+// GetQueryTraceRecord
+GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
+                                         PinnableSlice&& key,
+                                         uint64_t timestamp)
+    : QueryTraceRecord(timestamp),
+      cf_id_(column_family_id),
+      key_(std::move(key)) {}
+
+GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
+                                         const std::string& key,
+                                         uint64_t timestamp)
+    : QueryTraceRecord(timestamp), cf_id_(column_family_id) {
+  key_.PinSelf(key);
+}
+
+GetQueryTraceRecord::~GetQueryTraceRecord() {}
+
+uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; }
+
+Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); }
+
+Status GetQueryTraceRecord::Accept(Handler* handler) {
+  assert(handler != nullptr);
+  return handler->Handle(*this);
+}
+
+// IteratorQueryTraceRecord
+IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
+    : QueryTraceRecord(timestamp) {}
+
+IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
+
+// IteratorSeekQueryTraceRecord
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+    SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
+    uint64_t timestamp)
+    : IteratorQueryTraceRecord(timestamp),
+      type_(seek_type),
+      cf_id_(column_family_id),
+      key_(std::move(key)) {}
+
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+    SeekType seek_type, uint32_t column_family_id, const std::string& key,
+    uint64_t timestamp)
+    : IteratorQueryTraceRecord(timestamp),
+      type_(seek_type),
+      cf_id_(column_family_id) {
+  key_.PinSelf(key);
+}
+
+IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {}
+
+TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {
+  return static_cast<TraceType>(type_);
+}
+
+IteratorSeekQueryTraceRecord::SeekType
+IteratorSeekQueryTraceRecord::GetSeekType() const {
+  return type_;
+}
+
+uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const {
+  return cf_id_;
+}
+
+Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); }
+
+Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) {
+  assert(handler != nullptr);
+  return handler->Handle(*this);
+}
+
+// MultiGetQueryTraceRecord
+MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
+    std::vector<uint32_t> column_family_ids, std::vector<PinnableSlice>&& keys,
+    uint64_t timestamp)
+    : QueryTraceRecord(timestamp),
+      cf_ids_(column_family_ids),
+      keys_(std::move(keys)) {}
+
+MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
+    std::vector<uint32_t> column_family_ids,
+    const std::vector<std::string>& keys, uint64_t timestamp)
+    : QueryTraceRecord(timestamp), cf_ids_(column_family_ids) {
+  keys_.reserve(keys.size());
+  for (const std::string& key : keys) {
+    PinnableSlice ps;
+    ps.PinSelf(key);
+    keys_.push_back(std::move(ps));
+  }
+}
+
+MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {}
+
+std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const {
+  return cf_ids_;
+}
+
+std::vector<Slice> MultiGetQueryTraceRecord::GetKeys() const {
+  return std::vector<Slice>(keys_.begin(), keys_.end());
+}
+
+Status MultiGetQueryTraceRecord::Accept(Handler* handler) {
+  assert(handler != nullptr);
+  return handler->Handle(*this);
+}
+
+}  // namespace ROCKSDB_NAMESPACE
diff --git a/trace_replay/trace_record_handler.cc b/trace_replay/trace_record_handler.cc
new file mode 100644 (file)
index 0000000..4e8a40b
--- /dev/null
@@ -0,0 +1,108 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/trace_record_handler.h"
+
+#include "rocksdb/iterator.h"
+#include "rocksdb/write_batch.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceExecutionHandler
+TraceExecutionHandler::TraceExecutionHandler(
+    DB* db, const std::vector<ColumnFamilyHandle*>& handles)
+    : TraceRecord::Handler(),
+      db_(db),
+      write_opts_(WriteOptions()),
+      read_opts_(ReadOptions()) {
+  assert(db != nullptr);
+  assert(!handles.empty());
+  cf_map_.reserve(handles.size());
+  for (ColumnFamilyHandle* handle : handles) {
+    assert(handle != nullptr);
+    cf_map_.insert({handle->GetID(), handle});
+  }
+}
+
+TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
+
+Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) {
+  WriteBatch batch(record.GetWriteBatchRep().ToString());
+  return db_->Write(write_opts_, &batch);
+}
+
+Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) {
+  auto it = cf_map_.find(record.GetColumnFamilyID());
+  if (it == cf_map_.end()) {
+    return Status::Corruption("Invalid Column Family ID.");
+  }
+  assert(it->second != nullptr);
+
+  std::string value;
+  Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
+
+  // Treat not found as ok and return other errors.
+  return s.IsNotFound() ? Status::OK() : s;
+}
+
+Status TraceExecutionHandler::Handle(
+    const IteratorSeekQueryTraceRecord& record) {
+  auto it = cf_map_.find(record.GetColumnFamilyID());
+  if (it == cf_map_.end()) {
+    return Status::Corruption("Invalid Column Family ID.");
+  }
+  assert(it->second != nullptr);
+
+  Iterator* single_iter = db_->NewIterator(read_opts_, it->second);
+
+  switch (record.GetSeekType()) {
+    case IteratorSeekQueryTraceRecord::kSeekForPrev: {
+      single_iter->SeekForPrev(record.GetKey());
+      break;
+    }
+    default: {
+      single_iter->Seek(record.GetKey());
+      break;
+    }
+  }
+  Status s = single_iter->status();
+  delete single_iter;
+  return s;
+}
+
+Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
+  std::vector<ColumnFamilyHandle*> handles;
+  handles.reserve(record.GetColumnFamilyIDs().size());
+  for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
+    auto it = cf_map_.find(cf_id);
+    if (it == cf_map_.end()) {
+      return Status::Corruption("Invalid Column Family ID.");
+    }
+    assert(it->second != nullptr);
+    handles.push_back(it->second);
+  }
+
+  std::vector<Slice> keys = record.GetKeys();
+
+  if (handles.empty() || keys.empty()) {
+    return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+  }
+  if (handles.size() != keys.size()) {
+    return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
+  }
+
+  std::vector<std::string> values;
+  std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
+
+  // Treat not found as ok, return other errors.
+  for (Status s : ss) {
+    if (!s.ok() && !s.IsNotFound()) {
+      return s;
+    }
+  }
+  return Status::OK();
+}
+
+}  // namespace ROCKSDB_NAMESPACE
diff --git a/trace_replay/trace_record_handler.h b/trace_replay/trace_record_handler.h
new file mode 100644 (file)
index 0000000..fbc5a83
--- /dev/null
@@ -0,0 +1,39 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <unordered_map>
+#include <vector>
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Handler to execute TraceRecord.
+class TraceExecutionHandler : public TraceRecord::Handler {
+ public:
+  TraceExecutionHandler(DB* db,
+                        const std::vector<ColumnFamilyHandle*>& handles);
+  virtual ~TraceExecutionHandler() override;
+
+  virtual Status Handle(const WriteQueryTraceRecord& record) override;
+  virtual Status Handle(const GetQueryTraceRecord& record) override;
+  virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override;
+  virtual Status Handle(const MultiGetQueryTraceRecord& record) override;
+
+ private:
+  DB* db_;
+  std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
+  WriteOptions write_opts_;
+  ReadOptions read_opts_;
+};
+
+// To do: Handler for trace_analyzer.
+
+}  // namespace ROCKSDB_NAMESPACE
index 6171d91ec1d7b364514c8d55da58512263082b29..b3063c1431074a5e741b5af924c829f9fa708849 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "db/db_impl/db_impl.h"
 #include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/system_clock.h"
@@ -18,7 +19,6 @@
 #include "rocksdb/write_batch.h"
 #include "util/coding.h"
 #include "util/string_util.h"
-#include "util/threadpool_imp.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -104,6 +104,20 @@ Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
   return Status::OK();
 }
 
+Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
+                                  Trace* header) {
+  Status s = TracerHelper::DecodeTrace(encoded_trace, header);
+
+  if (header->type != kTraceBegin) {
+    return Status::Corruption("Corrupted trace file. Incorrect header.");
+  }
+  if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
+    return Status::Corruption("Corrupted trace file. Incorrect magic.");
+  }
+
+  return s;
+}
+
 bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
                                  const TracePayloadType payload_type) {
   uint64_t old_state = payload_map;
@@ -112,82 +126,153 @@ bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
   return old_state != payload_map;
 }
 
-void TracerHelper::DecodeWritePayload(Trace* trace,
-                                      WritePayload* write_payload) {
-  assert(write_payload != nullptr);
-  Slice buf(trace->payload);
-  GetFixed64(&buf, &trace->payload_map);
-  int64_t payload_map = static_cast<int64_t>(trace->payload_map);
-  while (payload_map) {
-    // Find the rightmost set bit.
-    uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
-    switch (set_pos) {
-      case TracePayloadType::kWriteBatchData:
-        GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data));
-        break;
-      default:
-        assert(false);
+Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version,
+                                       std::unique_ptr<TraceRecord>* record) {
+  assert(trace != nullptr);
+  assert(trace->type == kTraceWrite);
+
+  PinnableSlice rep;
+  if (trace_file_version < 2) {
+    rep.PinSelf(trace->payload);
+  } else {
+    Slice buf(trace->payload);
+    GetFixed64(&buf, &trace->payload_map);
+    int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+    Slice write_batch_data;
+    while (payload_map) {
+      // Find the rightmost set bit.
+      uint32_t set_pos =
+          static_cast<uint32_t>(log2(payload_map & -payload_map));
+      switch (set_pos) {
+        case TracePayloadType::kWriteBatchData:
+          GetLengthPrefixedSlice(&buf, &write_batch_data);
+          break;
+        default:
+          assert(false);
+      }
+      // unset the rightmost bit.
+      payload_map &= (payload_map - 1);
     }
-    // unset the rightmost bit.
-    payload_map &= (payload_map - 1);
+    rep.PinSelf(write_batch_data);
+  }
+
+  if (record != nullptr) {
+    record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
   }
+
+  return Status::OK();
 }
 
-void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) {
-  assert(get_payload != nullptr);
-  Slice buf(trace->payload);
-  GetFixed64(&buf, &trace->payload_map);
-  int64_t payload_map = static_cast<int64_t>(trace->payload_map);
-  while (payload_map) {
-    // Find the rightmost set bit.
-    uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
-    switch (set_pos) {
-      case TracePayloadType::kGetCFID:
-        GetFixed32(&buf, &(get_payload->cf_id));
-        break;
-      case TracePayloadType::kGetKey:
-        GetLengthPrefixedSlice(&buf, &(get_payload->get_key));
-        break;
-      default:
-        assert(false);
+Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version,
+                                     std::unique_ptr<TraceRecord>* record) {
+  assert(trace != nullptr);
+  assert(trace->type == kTraceGet);
+
+  uint32_t cf_id = 0;
+  Slice get_key;
+
+  if (trace_file_version < 2) {
+    DecodeCFAndKey(trace->payload, &cf_id, &get_key);
+  } else {
+    Slice buf(trace->payload);
+    GetFixed64(&buf, &trace->payload_map);
+    int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+    while (payload_map) {
+      // Find the rightmost set bit.
+      uint32_t set_pos =
+          static_cast<uint32_t>(log2(payload_map & -payload_map));
+      switch (set_pos) {
+        case TracePayloadType::kGetCFID:
+          GetFixed32(&buf, &cf_id);
+          break;
+        case TracePayloadType::kGetKey:
+          GetLengthPrefixedSlice(&buf, &get_key);
+          break;
+        default:
+          assert(false);
+      }
+      // unset the rightmost bit.
+      payload_map &= (payload_map - 1);
     }
-    // unset the rightmost bit.
-    payload_map &= (payload_map - 1);
   }
+
+  if (record != nullptr) {
+    PinnableSlice ps;
+    ps.PinSelf(get_key);
+    record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
+  }
+
+  return Status::OK();
 }
 
-void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
-  assert(iter_payload != nullptr);
-  Slice buf(trace->payload);
-  GetFixed64(&buf, &trace->payload_map);
-  int64_t payload_map = static_cast<int64_t>(trace->payload_map);
-  while (payload_map) {
-    // Find the rightmost set bit.
-    uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
-    switch (set_pos) {
-      case TracePayloadType::kIterCFID:
-        GetFixed32(&buf, &(iter_payload->cf_id));
-        break;
-      case TracePayloadType::kIterKey:
-        GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key));
-        break;
-      case TracePayloadType::kIterLowerBound:
-        GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound));
-        break;
-      case TracePayloadType::kIterUpperBound:
-        GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound));
-        break;
-      default:
-        assert(false);
+Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
+                                      std::unique_ptr<TraceRecord>* record) {
+  assert(trace != nullptr);
+  assert(trace->type == kTraceIteratorSeek ||
+         trace->type == kTraceIteratorSeekForPrev);
+
+  uint32_t cf_id = 0;
+  Slice iter_key;
+
+  if (trace_file_version < 2) {
+    DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
+  } else {
+    // Are these two used anywhere?
+    Slice lower_bound;
+    Slice upper_bound;
+
+    Slice buf(trace->payload);
+    GetFixed64(&buf, &trace->payload_map);
+    int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+    while (payload_map) {
+      // Find the rightmost set bit.
+      uint32_t set_pos =
+          static_cast<uint32_t>(log2(payload_map & -payload_map));
+      switch (set_pos) {
+        case TracePayloadType::kIterCFID:
+          GetFixed32(&buf, &cf_id);
+          break;
+        case TracePayloadType::kIterKey:
+          GetLengthPrefixedSlice(&buf, &iter_key);
+          break;
+        case TracePayloadType::kIterLowerBound:
+          GetLengthPrefixedSlice(&buf, &lower_bound);
+          break;
+        case TracePayloadType::kIterUpperBound:
+          GetLengthPrefixedSlice(&buf, &upper_bound);
+          break;
+        default:
+          assert(false);
+      }
+      // unset the rightmost bit.
+      payload_map &= (payload_map - 1);
     }
-    // unset the rightmost bit.
-    payload_map &= (payload_map - 1);
   }
+
+  if (record != nullptr) {
+    PinnableSlice ps_key;
+    ps_key.PinSelf(iter_key);
+    record->reset(new IteratorSeekQueryTraceRecord(
+        static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), cf_id,
+        std::move(ps_key), trace->ts));
+  }
+
+  return Status::OK();
 }
 
-void TracerHelper::DecodeMultiGetPayload(Trace* trace,
-                                         MultiGetPayload* multiget_payload) {
-  assert(multiget_payload != nullptr);
+Status TracerHelper::DecodeMultiGetRecord(
+    Trace* trace, int trace_file_version,
+    std::unique_ptr<TraceRecord>* record) {
+  assert(trace != nullptr);
+  assert(trace->type == kTraceMultiGet);
+  if (trace_file_version < 2) {
+    return Status::Corruption("MultiGet is not supported.");
+  }
+
+  uint32_t multiget_size = 0;
+  std::vector<uint32_t> cf_ids;
+  std::vector<PinnableSlice> multiget_keys;
+
   Slice cfids_payload;
   Slice keys_payload;
   Slice buf(trace->payload);
@@ -198,7 +283,7 @@ void TracerHelper::DecodeMultiGetPayload(Trace* trace,
     uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
     switch (set_pos) {
       case TracePayloadType::kMultiGetSize:
-        GetFixed32(&buf, &(multiget_payload->multiget_size));
+        GetFixed32(&buf, &multiget_size);
         break;
       case TracePayloadType::kMultiGetCFIDs:
         GetLengthPrefixedSlice(&buf, &cfids_payload);
@@ -212,18 +297,31 @@ void TracerHelper::DecodeMultiGetPayload(Trace* trace,
     // unset the rightmost bit.
     payload_map &= (payload_map - 1);
   }
+  if (multiget_size == 0) {
+    return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+  }
 
   // Decode the cfids_payload and keys_payload
-  multiget_payload->cf_ids.reserve(multiget_payload->multiget_size);
-  multiget_payload->multiget_keys.reserve(multiget_payload->multiget_size);
-  for (uint32_t i = 0; i < multiget_payload->multiget_size; i++) {
+  cf_ids.reserve(multiget_size);
+  multiget_keys.reserve(multiget_size);
+  for (uint32_t i = 0; i < multiget_size; i++) {
     uint32_t tmp_cfid;
     Slice tmp_key;
     GetFixed32(&cfids_payload, &tmp_cfid);
     GetLengthPrefixedSlice(&keys_payload, &tmp_key);
-    multiget_payload->cf_ids.push_back(tmp_cfid);
-    multiget_payload->multiget_keys.push_back(tmp_key.ToString());
+    cf_ids.push_back(tmp_cfid);
+    Slice s(tmp_key);
+    PinnableSlice ps;
+    ps.PinSelf(s);
+    multiget_keys.push_back(std::move(ps));
+  }
+
+  if (record != nullptr) {
+    record->reset(new MultiGetQueryTraceRecord(
+        std::move(cf_ids), std::move(multiget_keys), trace->ts));
   }
+
+  return Status::OK();
 }
 
 Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
@@ -418,10 +516,9 @@ bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
   if (IsTraceFileOverMax()) {
     return true;
   }
-  if ((trace_options_.filter & kTraceFilterGet
-    && trace_type == kTraceGet)
-   || (trace_options_.filter & kTraceFilterWrite
-    && trace_type == kTraceWrite)) {
+  if ((trace_options_.filter & kTraceFilterGet && trace_type == kTraceGet) ||
+      (trace_options_.filter & kTraceFilterWrite &&
+       trace_type == kTraceWrite)) {
     return true;
   }
   ++trace_request_count_;
@@ -471,445 +568,4 @@ Status Tracer::WriteTrace(const Trace& trace) {
 
 Status Tracer::Close() { return WriteFooter(); }
 
-Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
-                   std::unique_ptr<TraceReader>&& reader)
-    : trace_reader_(std::move(reader)) {
-  assert(db != nullptr);
-  db_ = static_cast<DBImpl*>(db->GetRootDB());
-  env_ = Env::Default();
-  for (ColumnFamilyHandle* cfh : handles) {
-    cf_map_[cfh->GetID()] = cfh;
-  }
-  fast_forward_ = 1;
-}
-
-Replayer::~Replayer() { trace_reader_.reset(); }
-
-Status Replayer::SetFastForward(uint32_t fast_forward) {
-  Status s;
-  if (fast_forward < 1) {
-    s = Status::InvalidArgument("Wrong fast forward speed!");
-  } else {
-    fast_forward_ = fast_forward;
-    s = Status::OK();
-  }
-  return s;
-}
-
-Status Replayer::Replay() {
-  Status s;
-  Trace header;
-  int db_version;
-  s = ReadHeader(&header);
-  if (!s.ok()) {
-    return s;
-  }
-  s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
-  if (!s.ok()) {
-    return s;
-  }
-
-  std::chrono::system_clock::time_point replay_epoch =
-      std::chrono::system_clock::now();
-  WriteOptions woptions;
-  ReadOptions roptions;
-  Trace trace;
-  uint64_t ops = 0;
-  Iterator* single_iter = nullptr;
-  while (s.ok()) {
-    trace.reset();
-    s = ReadTrace(&trace);
-    if (!s.ok()) {
-      break;
-    }
-
-    std::this_thread::sleep_until(
-        replay_epoch +
-        std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
-    if (trace.type == kTraceWrite) {
-      if (trace_file_version_ < 2) {
-        WriteBatch batch(trace.payload);
-        db_->Write(woptions, &batch);
-      } else {
-        WritePayload w_payload;
-        TracerHelper::DecodeWritePayload(&trace, &w_payload);
-        WriteBatch batch(w_payload.write_batch_data.ToString());
-        db_->Write(woptions, &batch);
-      }
-      ops++;
-    } else if (trace.type == kTraceGet) {
-      GetPayload get_payload;
-      get_payload.cf_id = 0;
-      get_payload.get_key = 0;
-      if (trace_file_version_ < 2) {
-        DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key);
-      } else {
-        TracerHelper::DecodeGetPayload(&trace, &get_payload);
-      }
-      if (get_payload.cf_id > 0 &&
-          cf_map_.find(get_payload.cf_id) == cf_map_.end()) {
-        return Status::Corruption("Invalid Column Family ID.");
-      }
-
-      std::string value;
-      if (get_payload.cf_id == 0) {
-        db_->Get(roptions, get_payload.get_key, &value);
-      } else {
-        db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key,
-                 &value);
-      }
-      ops++;
-    } else if (trace.type == kTraceIteratorSeek) {
-      // Currently, we only support to call Seek. The Next() and Prev() is not
-      // supported.
-      IterPayload iter_payload;
-      iter_payload.cf_id = 0;
-      if (trace_file_version_ < 2) {
-        DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
-                       &iter_payload.iter_key);
-      } else {
-        TracerHelper::DecodeIterPayload(&trace, &iter_payload);
-      }
-      if (iter_payload.cf_id > 0 &&
-          cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
-        return Status::Corruption("Invalid Column Family ID.");
-      }
-
-      if (iter_payload.cf_id == 0) {
-        single_iter = db_->NewIterator(roptions);
-      } else {
-        single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
-      }
-      single_iter->Seek(iter_payload.iter_key);
-      ops++;
-      delete single_iter;
-    } else if (trace.type == kTraceIteratorSeekForPrev) {
-      // Currently, we only support to call SeekForPrev. The Next() and Prev()
-      // is not supported.
-      IterPayload iter_payload;
-      iter_payload.cf_id = 0;
-      if (trace_file_version_ < 2) {
-        DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
-                       &iter_payload.iter_key);
-      } else {
-        TracerHelper::DecodeIterPayload(&trace, &iter_payload);
-      }
-      if (iter_payload.cf_id > 0 &&
-          cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
-        return Status::Corruption("Invalid Column Family ID.");
-      }
-
-      if (iter_payload.cf_id == 0) {
-        single_iter = db_->NewIterator(roptions);
-      } else {
-        single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
-      }
-      single_iter->SeekForPrev(iter_payload.iter_key);
-      ops++;
-      delete single_iter;
-    } else if (trace.type == kTraceMultiGet) {
-      MultiGetPayload multiget_payload;
-      assert(trace_file_version_ >= 2);
-      TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
-      std::vector<ColumnFamilyHandle*> v_cfd;
-      std::vector<Slice> keys;
-      assert(multiget_payload.cf_ids.size() ==
-             multiget_payload.multiget_keys.size());
-      for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
-        assert(i < multiget_payload.cf_ids.size() &&
-               i < multiget_payload.multiget_keys.size());
-        if (cf_map_.find(multiget_payload.cf_ids[i]) == cf_map_.end()) {
-          return Status::Corruption("Invalid Column Family ID.");
-        }
-        v_cfd.push_back(cf_map_[multiget_payload.cf_ids[i]]);
-        keys.push_back(Slice(multiget_payload.multiget_keys[i]));
-      }
-      std::vector<std::string> values;
-      std::vector<Status> ss = db_->MultiGet(roptions, v_cfd, keys, &values);
-    } else if (trace.type == kTraceEnd) {
-      // Do nothing for now.
-      // TODO: Add some validations later.
-      break;
-    }
-  }
-
-  if (s.IsIncomplete()) {
-    // Reaching eof returns Incomplete status at the moment.
-    // Could happen when killing a process without calling EndTrace() API.
-    // TODO: Add better error handling.
-    return Status::OK();
-  }
-  return s;
-}
-
-// The trace can be replayed with multithread by configurnge the number of
-// threads in the thread pool. Trace records are read from the trace file
-// sequentially and the corresponding queries are scheduled in the task
-// queue based on the timestamp. Currently, we support Write_batch (Put,
-// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
-Status Replayer::MultiThreadReplay(uint32_t threads_num) {
-  Status s;
-  Trace header;
-  int db_version;
-  s = ReadHeader(&header);
-  if (!s.ok()) {
-    return s;
-  }
-  s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
-  if (!s.ok()) {
-    return s;
-  }
-  ThreadPoolImpl thread_pool;
-  thread_pool.SetHostEnv(env_);
-
-  if (threads_num > 1) {
-    thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
-  } else {
-    thread_pool.SetBackgroundThreads(1);
-  }
-
-  std::chrono::system_clock::time_point replay_epoch =
-      std::chrono::system_clock::now();
-  WriteOptions woptions;
-  ReadOptions roptions;
-  uint64_t ops = 0;
-  while (s.ok()) {
-    std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
-    ra->db = db_;
-    s = ReadTrace(&(ra->trace_entry));
-    if (!s.ok()) {
-      break;
-    }
-    ra->cf_map = &cf_map_;
-    ra->woptions = woptions;
-    ra->roptions = roptions;
-    ra->trace_file_version = trace_file_version_;
-
-    std::this_thread::sleep_until(
-        replay_epoch + std::chrono::microseconds(
-                           (ra->trace_entry.ts - header.ts) / fast_forward_));
-    if (ra->trace_entry.type == kTraceWrite) {
-      thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
-                           nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceGet) {
-      thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
-                           nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceIteratorSeek) {
-      thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
-                           nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
-      thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
-                           nullptr, nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceMultiGet) {
-      thread_pool.Schedule(&Replayer::BGWorkMultiGet, ra.release(), nullptr,
-                           nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceEnd) {
-      // Do nothing for now.
-      // TODO: Add some validations later.
-      break;
-    } else {
-      // Other trace entry types that are not implemented for replay.
-      // To finish the replay, we continue the process.
-      continue;
-    }
-  }
-
-  if (s.IsIncomplete()) {
-    // Reaching eof returns Incomplete status at the moment.
-    // Could happen when killing a process without calling EndTrace() API.
-    // TODO: Add better error handling.
-    s = Status::OK();
-  }
-  thread_pool.JoinAllThreads();
-  return s;
-}
-
-Status Replayer::ReadHeader(Trace* header) {
-  assert(header != nullptr);
-  std::string encoded_trace;
-  // Read the trace head
-  Status s = trace_reader_->Read(&encoded_trace);
-  if (!s.ok()) {
-    return s;
-  }
-
-  s = TracerHelper::DecodeTrace(encoded_trace, header);
-
-  if (header->type != kTraceBegin) {
-    return Status::Corruption("Corrupted trace file. Incorrect header.");
-  }
-  if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
-    return Status::Corruption("Corrupted trace file. Incorrect magic.");
-  }
-
-  return s;
-}
-
-Status Replayer::ReadFooter(Trace* footer) {
-  assert(footer != nullptr);
-  Status s = ReadTrace(footer);
-  if (!s.ok()) {
-    return s;
-  }
-  if (footer->type != kTraceEnd) {
-    return Status::Corruption("Corrupted trace file. Incorrect footer.");
-  }
-
-  // TODO: Add more validations later
-  return s;
-}
-
-Status Replayer::ReadTrace(Trace* trace) {
-  assert(trace != nullptr);
-  std::string encoded_trace;
-  Status s = trace_reader_->Read(&encoded_trace);
-  if (!s.ok()) {
-    return s;
-  }
-  return TracerHelper::DecodeTrace(encoded_trace, trace);
-}
-
-void Replayer::BGWorkGet(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
-      ra->cf_map);
-  GetPayload get_payload;
-  get_payload.cf_id = 0;
-  if (ra->trace_file_version < 2) {
-    DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id,
-                   &get_payload.get_key);
-  } else {
-    TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload);
-  }
-  if (get_payload.cf_id > 0 &&
-      cf_map->find(get_payload.cf_id) == cf_map->end()) {
-    return;
-  }
-
-  std::string value;
-  if (get_payload.cf_id == 0) {
-    ra->db->Get(ra->roptions, get_payload.get_key, &value);
-  } else {
-    ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key,
-                &value);
-  }
-  return;
-}
-
-void Replayer::BGWorkWriteBatch(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-
-  if (ra->trace_file_version < 2) {
-    WriteBatch batch(ra->trace_entry.payload);
-    ra->db->Write(ra->woptions, &batch);
-  } else {
-    WritePayload w_payload;
-    TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload);
-    WriteBatch batch(w_payload.write_batch_data.ToString());
-    ra->db->Write(ra->woptions, &batch);
-  }
-  return;
-}
-
-void Replayer::BGWorkIterSeek(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
-      ra->cf_map);
-  IterPayload iter_payload;
-  iter_payload.cf_id = 0;
-
-  if (ra->trace_file_version < 2) {
-    DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
-                   &iter_payload.iter_key);
-  } else {
-    TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
-  }
-  if (iter_payload.cf_id > 0 &&
-      cf_map->find(iter_payload.cf_id) == cf_map->end()) {
-    return;
-  }
-
-  Iterator* single_iter = nullptr;
-  if (iter_payload.cf_id == 0) {
-    single_iter = ra->db->NewIterator(ra->roptions);
-  } else {
-    single_iter =
-        ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
-  }
-  single_iter->Seek(iter_payload.iter_key);
-  delete single_iter;
-  return;
-}
-
-void Replayer::BGWorkIterSeekForPrev(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
-      ra->cf_map);
-  IterPayload iter_payload;
-  iter_payload.cf_id = 0;
-
-  if (ra->trace_file_version < 2) {
-    DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
-                   &iter_payload.iter_key);
-  } else {
-    TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
-  }
-  if (iter_payload.cf_id > 0 &&
-      cf_map->find(iter_payload.cf_id) == cf_map->end()) {
-    return;
-  }
-
-  Iterator* single_iter = nullptr;
-  if (iter_payload.cf_id == 0) {
-    single_iter = ra->db->NewIterator(ra->roptions);
-  } else {
-    single_iter =
-        ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
-  }
-  single_iter->SeekForPrev(iter_payload.iter_key);
-  delete single_iter;
-  return;
-}
-
-void Replayer::BGWorkMultiGet(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
-      ra->cf_map);
-  MultiGetPayload multiget_payload;
-  if (ra->trace_file_version < 2) {
-    return;
-  }
-  TracerHelper::DecodeMultiGetPayload(&(ra->trace_entry), &multiget_payload);
-  std::vector<ColumnFamilyHandle*> v_cfd;
-  std::vector<Slice> keys;
-  if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
-    return;
-  }
-  for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
-    if (cf_map->find(multiget_payload.cf_ids[i]) == cf_map->end()) {
-      return;
-    }
-    v_cfd.push_back((*cf_map)[multiget_payload.cf_ids[i]]);
-    keys.push_back(Slice(multiget_payload.multiget_keys[i]));
-  }
-  std::vector<std::string> values;
-  std::vector<Status> ss = ra->db->MultiGet(ra->roptions, v_cfd, keys, &values);
-  return;
-}
-
 }  // namespace ROCKSDB_NAMESPACE
index d10bc1b4639527988ed176c8e8c4d38252910012..979eb349278b9f634f25f29bb7a899882c461c70 100644 (file)
@@ -5,13 +5,17 @@
 
 #pragma once
 
+#include <atomic>
 #include <memory>
+#include <mutex>
 #include <unordered_map>
 #include <utility>
 
 #include "rocksdb/options.h"
 #include "rocksdb/rocksdb_namespace.h"
 #include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/utilities/replayer.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -43,31 +47,6 @@ const unsigned int kTraceMetadataSize =
 static const int kTraceFileMajorVersion = 0;
 static const int kTraceFileMinorVersion = 2;
 
-// Supported Trace types.
-enum TraceType : char {
-  kTraceBegin = 1,
-  kTraceEnd = 2,
-  kTraceWrite = 3,
-  kTraceGet = 4,
-  kTraceIteratorSeek = 5,
-  kTraceIteratorSeekForPrev = 6,
-  // Block cache related types.
-  kBlockTraceIndexBlock = 7,
-  kBlockTraceFilterBlock = 8,
-  kBlockTraceDataBlock = 9,
-  kBlockTraceUncompressionDictBlock = 10,
-  kBlockTraceRangeDeletionBlock = 11,
-  // For IOTracing.
-  kIOTracer = 12,
-  // For query tracing
-  kTraceMultiGet = 13,
-  // All trace types should be added before kTraceMax
-  kTraceMax,
-};
-
-// TODO: This should also be made part of public interface to help users build
-// custom TracerReaders and TraceWriters.
-//
 // The data structure that defines a single trace.
 struct Trace {
   uint64_t ts;  // timestamp
@@ -105,28 +84,6 @@ enum TracePayloadType : char {
   kMultiGetKeys = 10,
 };
 
-struct WritePayload {
-  Slice write_batch_data;
-};
-
-struct GetPayload {
-  uint32_t cf_id = 0;
-  Slice get_key;
-};
-
-struct IterPayload {
-  uint32_t cf_id = 0;
-  Slice iter_key;
-  Slice lower_bound;
-  Slice upper_bound;
-};
-
-struct MultiGetPayload {
-  uint32_t multiget_size;
-  std::vector<uint32_t> cf_ids;
-  std::vector<std::string> multiget_keys;
-};
-
 class TracerHelper {
  public:
   // Parse the string with major and minor version only
@@ -142,22 +99,28 @@ class TracerHelper {
   // Decode a string into the given trace object.
   static Status DecodeTrace(const std::string& encoded_trace, Trace* trace);
 
+  // Decode a string into the given trace header.
+  static Status DecodeHeader(const std::string& encoded_trace, Trace* header);
+
   // Set the payload map based on the payload type
   static bool SetPayloadMap(uint64_t& payload_map,
                             const TracePayloadType payload_type);
 
   // Decode the write payload and store in WrteiPayload
-  static void DecodeWritePayload(Trace* trace, WritePayload* write_payload);
+  static Status DecodeWriteRecord(Trace* trace, int trace_file_version,
+                                  std::unique_ptr<TraceRecord>* record);
 
   // Decode the get payload and store in WrteiPayload
-  static void DecodeGetPayload(Trace* trace, GetPayload* get_payload);
+  static Status DecodeGetRecord(Trace* trace, int trace_file_version,
+                                std::unique_ptr<TraceRecord>* record);
 
   // Decode the iter payload and store in WrteiPayload
-  static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload);
+  static Status DecodeIterRecord(Trace* trace, int trace_file_version,
+                                 std::unique_ptr<TraceRecord>* record);
 
   // Decode the multiget payload and store in MultiGetPayload
-  static void DecodeMultiGetPayload(Trace* trace,
-                                    MultiGetPayload* multiget_payload);
+  static Status DecodeMultiGetRecord(Trace* trace, int trace_file_version,
+                                     std::unique_ptr<TraceRecord>* record);
 };
 
 // Tracer captures all RocksDB operations using a user-provided TraceWriter.
@@ -222,75 +185,4 @@ class Tracer {
   uint64_t trace_request_count_;
 };
 
-// Replayer helps to replay the captured RocksDB operations, using a user
-// provided TraceReader.
-// The Replayer is instantiated via db_bench today, on using "replay" benchmark.
-class Replayer {
- public:
-  Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
-           std::unique_ptr<TraceReader>&& reader);
-  ~Replayer();
-
-  // Replay all the traces from the provided trace stream, taking the delay
-  // between the traces into consideration.
-  Status Replay();
-
-  // Replay the provide trace stream, which is the same as Replay(), with
-  // multi-threads. Queries are scheduled in the thread pool job queue.
-  // User can set the number of threads in the thread pool.
-  Status MultiThreadReplay(uint32_t threads_num);
-
-  // Enables fast forwarding a replay by reducing the delay between the ingested
-  // traces.
-  // fast_forward : Rate of replay speedup.
-  //   If 1, replay the operations at the same rate as in the trace stream.
-  //   If > 1, speed up the replay by this amount.
-  Status SetFastForward(uint32_t fast_forward);
-
- private:
-  Status ReadHeader(Trace* header);
-  Status ReadFooter(Trace* footer);
-  Status ReadTrace(Trace* trace);
-
-  // The background function for MultiThreadReplay to execute Get query
-  // based on the trace records.
-  static void BGWorkGet(void* arg);
-
-  // The background function for MultiThreadReplay to execute WriteBatch
-  // (Put, Delete, SingleDelete, DeleteRange) based on the trace records.
-  static void BGWorkWriteBatch(void* arg);
-
-  // The background function for MultiThreadReplay to execute Iterator (Seek)
-  // based on the trace records.
-  static void BGWorkIterSeek(void* arg);
-
-  // The background function for MultiThreadReplay to execute Iterator
-  // (SeekForPrev) based on the trace records.
-  static void BGWorkIterSeekForPrev(void* arg);
-
-  // The background function for MultiThreadReplay to execute MultiGet based on
-  // the trace records
-  static void BGWorkMultiGet(void* arg);
-
-  DBImpl* db_;
-  Env* env_;
-  std::unique_ptr<TraceReader> trace_reader_;
-  std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
-  uint32_t fast_forward_;
-  // When reading the trace header, the trace file version can be parsed.
-  // Replayer will use different decode method to get the trace content based
-  // on different trace file version.
-  int trace_file_version_;
-};
-
-// The passin arg of MultiThreadRepkay for each trace record.
-struct ReplayerWorkerArg {
-  DB* db;
-  Trace trace_entry;
-  std::unordered_map<uint32_t, ColumnFamilyHandle*>* cf_map;
-  WriteOptions woptions;
-  ReadOptions roptions;
-  int trace_file_version;
-};
-
 }  // namespace ROCKSDB_NAMESPACE
index 31eaf855439c7d62480ed6f369c7a4009ba2a3a5..b214d4ac08f7a35cabd8e84febb35ed3096b0fc0 100644 (file)
@@ -4,8 +4,11 @@
 //  (found in the LICENSE.Apache file in the root directory).
 
 #include "utilities/simulator_cache/cache_simulator.h"
+
 #include <algorithm>
+
 #include "db/dbformat.h"
+#include "rocksdb/trace_record.h"
 
 namespace ROCKSDB_NAMESPACE {
 
index a205315ccc37a1a6a3ca4f6ebcb9ba5276bc39b3..beacdfa1ee59c19820879302b5e1489a7ad06508 100644 (file)
@@ -6,7 +6,9 @@
 #include "utilities/simulator_cache/cache_simulator.h"
 
 #include <cstdlib>
+
 #include "rocksdb/env.h"
+#include "rocksdb/trace_record.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 
index d553e24340283aceef2b14cd9de57dd7c390b70e..dc58ded21e6dea1bbffee6363165116ea20028c4 100644 (file)
@@ -31,6 +31,14 @@ Status FileTraceReader::Close() {
   return Status::OK();
 }
 
+Status FileTraceReader::Reset() {
+  if (file_reader_ == nullptr) {
+    return Status::IOError("TraceReader is closed.");
+  }
+  offset_ = 0;
+  return Status::OK();
+}
+
 Status FileTraceReader::Read(std::string* data) {
   assert(file_reader_ != nullptr);
   Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize,
index a9eafa5af1ad7d5f31e790c976bd4662e5679e9f..909317fe4abc8de9ecfe8c509fa87037378abbbd 100644 (file)
@@ -20,6 +20,7 @@ class FileTraceReader : public TraceReader {
 
   virtual Status Read(std::string* data) override;
   virtual Status Close() override;
+  virtual Status Reset() override;
 
  private:
   std::unique_ptr<RandomAccessFileReader> file_reader_;
diff --git a/utilities/trace/replayer_impl.cc b/utilities/trace/replayer_impl.cc
new file mode 100644 (file)
index 0000000..2789de5
--- /dev/null
@@ -0,0 +1,305 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/trace/replayer_impl.h"
+
+#include <cmath>
+#include <thread>
+
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "util/threadpool_imp.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+ReplayerImpl::ReplayerImpl(DB* db,
+                           const std::vector<ColumnFamilyHandle*>& handles,
+                           std::unique_ptr<TraceReader>&& reader)
+    : Replayer(),
+      env_(db->GetEnv()),
+      trace_reader_(std::move(reader)),
+      prepared_(false),
+      trace_end_(false),
+      header_ts_(0),
+      exec_handler_(TraceRecord::NewExecutionHandler(db, handles)) {}
+
+ReplayerImpl::~ReplayerImpl() {
+  exec_handler_.reset();
+  trace_reader_.reset();
+}
+
+Status ReplayerImpl::Prepare() {
+  Trace header;
+  int db_version;
+  Status s = ReadHeader(&header);
+  if (!s.ok()) {
+    return s;
+  }
+  s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
+  if (!s.ok()) {
+    return s;
+  }
+  header_ts_ = header.ts;
+  prepared_ = true;
+  trace_end_ = false;
+  return Status::OK();
+}
+
+Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
+  if (!prepared_) {
+    return Status::Incomplete("Not prepared!");
+  }
+  if (trace_end_) {
+    return Status::Incomplete("Trace end.");
+  }
+
+  Trace trace;
+  Status s = ReadTrace(&trace);  // ReadTrace is atomic
+  // Reached the trace end.
+  if (s.ok() && trace.type == kTraceEnd) {
+    trace_end_ = true;
+    return Status::Incomplete("Trace end.");
+  }
+  if (!s.ok() || record == nullptr) {
+    return s;
+  }
+
+  return DecodeTraceRecord(&trace, trace_file_version_, record);
+}
+
+Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record) {
+  return record->Accept(exec_handler_.get());
+}
+
+Status ReplayerImpl::Execute(std::unique_ptr<TraceRecord>&& record) {
+  Status s = record->Accept(exec_handler_.get());
+  record.reset();
+  return s;
+}
+
+Status ReplayerImpl::Replay(const ReplayOptions& options) {
+  if (options.fast_forward <= 0.0) {
+    return Status::InvalidArgument("Wrong fast forward speed!");
+  }
+
+  if (!prepared_) {
+    return Status::Incomplete("Not prepared!");
+  }
+  if (trace_end_) {
+    return Status::Incomplete("Trace end.");
+  }
+
+  Status s = Status::OK();
+
+  if (options.num_threads <= 1) {
+    // num_threads == 0 or num_threads == 1 uses single thread.
+    std::chrono::system_clock::time_point replay_epoch =
+        std::chrono::system_clock::now();
+
+    while (s.ok()) {
+      Trace trace;
+      s = ReadTrace(&trace);
+      // If already at trace end, ReadTrace should return Status::Incomplete().
+      if (!s.ok()) {
+        break;
+      }
+
+      // No need to sleep before breaking the loop if at the trace end.
+      if (trace.type == kTraceEnd) {
+        trace_end_ = true;
+        s = Status::Incomplete("Trace end.");
+        break;
+      }
+
+      // In single-threaded replay, decode first then sleep.
+      std::unique_ptr<TraceRecord> record;
+      s = DecodeTraceRecord(&trace, trace_file_version_, &record);
+      // Skip unsupported traces, stop for other errors.
+      if (s.IsNotSupported()) {
+        continue;
+      } else if (!s.ok()) {
+        break;
+      }
+
+      std::this_thread::sleep_until(
+          replay_epoch +
+          std::chrono::microseconds(static_cast<uint64_t>(std::llround(
+              1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+
+      s = Execute(std::move(record));
+    }
+  } else {
+    // Multi-threaded replay.
+    ThreadPoolImpl thread_pool;
+    thread_pool.SetHostEnv(env_);
+    thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
+
+    std::mutex mtx;
+    // Background decoding and execution status.
+    Status bg_s = Status::OK();
+    uint64_t last_err_ts = static_cast<uint64_t>(-1);
+    // Callback function used in background work to update bg_s at the first
+    // execution error (with the smallest Trace timestamp).
+    auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
+      std::lock_guard<std::mutex> gd(mtx);
+      // Only record the first error.
+      if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
+        bg_s = err;
+        last_err_ts = err_ts;
+      }
+    };
+
+    std::chrono::system_clock::time_point replay_epoch =
+        std::chrono::system_clock::now();
+
+    while (bg_s.ok() && s.ok()) {
+      Trace trace;
+      s = ReadTrace(&trace);
+      // If already at trace end, ReadTrace should return Status::Incomplete().
+      if (!s.ok()) {
+        break;
+      }
+
+      TraceType trace_type = trace.type;
+
+      // No need to sleep before breaking the loop if at the trace end.
+      if (trace_type == kTraceEnd) {
+        trace_end_ = true;
+        s = Status::Incomplete("Trace end.");
+        break;
+      }
+
+      // In multi-threaded replay, sleep first thatn start decoding and
+      // execution in a thread.
+      std::this_thread::sleep_until(
+          replay_epoch +
+          std::chrono::microseconds(static_cast<uint64_t>(std::llround(
+              1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+
+      if (trace_type == kTraceWrite || trace_type == kTraceGet ||
+          trace_type == kTraceIteratorSeek ||
+          trace_type == kTraceIteratorSeekForPrev ||
+          trace_type == kTraceMultiGet) {
+        std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
+        ra->trace_entry = std::move(trace);
+        ra->handler = exec_handler_.get();
+        ra->trace_file_version = trace_file_version_;
+        ra->error_cb = error_cb;
+        thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
+                             nullptr, nullptr);
+      }
+      // Skip unsupported traces.
+    }
+
+    thread_pool.WaitForJobsAndJoinAllThreads();
+    if (!bg_s.ok()) {
+      s = bg_s;
+    }
+  }
+
+  if (s.IsIncomplete()) {
+    // Reaching eof returns Incomplete status at the moment.
+    // Could happen when killing a process without calling EndTrace() API.
+    // TODO: Add better error handling.
+    trace_end_ = true;
+    return Status::OK();
+  }
+  return s;
+}
+
+uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
+
+Status ReplayerImpl::ReadHeader(Trace* header) {
+  assert(header != nullptr);
+  Status s = trace_reader_->Reset();
+  if (!s.ok()) {
+    return s;
+  }
+  std::string encoded_trace;
+  // Read the trace head
+  s = trace_reader_->Read(&encoded_trace);
+  if (!s.ok()) {
+    return s;
+  }
+
+  return TracerHelper::DecodeHeader(encoded_trace, header);
+}
+
+Status ReplayerImpl::ReadFooter(Trace* footer) {
+  assert(footer != nullptr);
+  Status s = ReadTrace(footer);
+  if (!s.ok()) {
+    return s;
+  }
+  if (footer->type != kTraceEnd) {
+    return Status::Corruption("Corrupted trace file. Incorrect footer.");
+  }
+
+  // TODO: Add more validations later
+  return s;
+}
+
+Status ReplayerImpl::ReadTrace(Trace* trace) {
+  assert(trace != nullptr);
+  std::string encoded_trace;
+  // We don't know if TraceReader is implemented thread-safe, so we protect the
+  // reading trace part with a mutex. The decoding part does not need to be
+  // protected since it's local.
+  {
+    std::lock_guard<std::mutex> guard(mutex_);
+    Status s = trace_reader_->Read(&encoded_trace);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+  return TracerHelper::DecodeTrace(encoded_trace, trace);
+}
+
+Status ReplayerImpl::DecodeTraceRecord(Trace* trace, int trace_file_version,
+                                       std::unique_ptr<TraceRecord>* record) {
+  switch (trace->type) {
+    case kTraceWrite:
+      return TracerHelper::DecodeWriteRecord(trace, trace_file_version, record);
+    case kTraceGet:
+      return TracerHelper::DecodeGetRecord(trace, trace_file_version, record);
+    case kTraceIteratorSeek:
+    case kTraceIteratorSeekForPrev:
+      return TracerHelper::DecodeIterRecord(trace, trace_file_version, record);
+    case kTraceMultiGet:
+      return TracerHelper::DecodeMultiGetRecord(trace, trace_file_version,
+                                                record);
+    case kTraceEnd:
+      return Status::Incomplete("Trace end.");
+    default:
+      return Status::NotSupported("Unsupported trace type.");
+  }
+}
+
+void ReplayerImpl::BackgroundWork(void* arg) {
+  std::unique_ptr<ReplayerWorkerArg> ra(
+      reinterpret_cast<ReplayerWorkerArg*>(arg));
+  assert(ra != nullptr);
+
+  std::unique_ptr<TraceRecord> record;
+  Status s =
+      DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
+  if (s.ok()) {
+    s = record->Accept(ra->handler);
+    record.reset();
+  }
+  if (!s.ok() && ra->error_cb) {
+    ra->error_cb(s, ra->trace_entry.ts);
+  }
+}
+
+}  // namespace ROCKSDB_NAMESPACE
+#endif  // ROCKSDB_LITE
diff --git a/utilities/trace/replayer_impl.h b/utilities/trace/replayer_impl.h
new file mode 100644 (file)
index 0000000..b796d22
--- /dev/null
@@ -0,0 +1,90 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/utilities/replayer.h"
+#include "trace_replay/trace_replay.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ColumnFamilyHandle;
+class DB;
+class Env;
+class TraceReader;
+class TraceRecord;
+class Status;
+
+struct ReplayOptions;
+
+class ReplayerImpl : public Replayer {
+ public:
+  ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
+               std::unique_ptr<TraceReader>&& reader);
+  ~ReplayerImpl() override;
+
+  using Replayer::Prepare;
+  Status Prepare() override;
+
+  using Replayer::Next;
+  Status Next(std::unique_ptr<TraceRecord>* record) override;
+
+  using Replayer::Execute;
+  Status Execute(const std::unique_ptr<TraceRecord>& record) override;
+  Status Execute(std::unique_ptr<TraceRecord>&& record) override;
+
+  using Replayer::Replay;
+  Status Replay(const ReplayOptions& options) override;
+
+  using Replayer::GetHeaderTimestamp;
+  uint64_t GetHeaderTimestamp() const override;
+
+ private:
+  Status ReadHeader(Trace* header);
+  Status ReadFooter(Trace* footer);
+  Status ReadTrace(Trace* trace);
+
+  // Generic function to convert a Trace to TraceRecord.
+  static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
+                                  std::unique_ptr<TraceRecord>* record);
+
+  // Generic function to execute a Trace in a thread pool.
+  static void BackgroundWork(void* arg);
+
+  Env* env_;
+  std::unique_ptr<TraceReader> trace_reader_;
+  // When reading the trace header, the trace file version can be parsed.
+  // Replayer will use different decode method to get the trace content based
+  // on different trace file version.
+  int trace_file_version_;
+  std::mutex mutex_;
+  std::atomic<bool> prepared_;
+  std::atomic<bool> trace_end_;
+  uint64_t header_ts_;
+  std::unique_ptr<TraceRecord::Handler> exec_handler_;
+};
+
+// The passin arg of MultiThreadRepkay for each trace record.
+struct ReplayerWorkerArg {
+  Trace trace_entry;
+  int trace_file_version;
+  // Handler to execute TraceRecord.
+  TraceRecord::Handler* handler;
+  // Callback function to report the error status and the timestamp of the
+  // TraceRecord.
+  std::function<void(Status, uint64_t)> error_cb;
+};
+
+}  // namespace ROCKSDB_NAMESPACE
+#endif  // ROCKSDB_LITE