tools/ldb_tool.cc
tools/sst_dump_tool.cc
tools/trace_analyzer_tool.cc
- trace_replay/trace_replay.cc
trace_replay/block_cache_tracer.cc
trace_replay/io_tracer.cc
+ trace_replay/trace_record_handler.cc
+ trace_replay/trace_record.cc
+ trace_replay/trace_replay.cc
util/coding.cc
util/compaction_job_stats_impl.cc
util/comparator.cc
utilities/simulator_cache/sim_cache.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/trace/file_trace_reader_writer.cc
+ utilities/trace/replayer_impl.cc
utilities/transactions/lock/lock_manager.cc
utilities/transactions/lock/point/point_lock_tracker.cc
utilities/transactions/lock/point/point_lock_manager.cc
* BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions.
* Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature.
* Add a comment to suggest btrfs user to disable file preallocation by setting `options.allow_fallocate=false`.
+* Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench.
+
+## Public API change
+* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them.
### Performance Improvements
* Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.
"tools/sst_dump_tool.cc",
"trace_replay/block_cache_tracer.cc",
"trace_replay/io_tracer.cc",
+ "trace_replay/trace_record.cc",
+ "trace_replay/trace_record_handler.cc",
"trace_replay/trace_replay.cc",
"util/build_version.cc",
"util/coding.cc",
"utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc",
+ "utilities/trace/replayer_impl.cc",
"utilities/transactions/lock/lock_manager.cc",
"utilities/transactions/lock/point/point_lock_manager.cc",
"utilities/transactions/lock/point/point_lock_tracker.cc",
"tools/sst_dump_tool.cc",
"trace_replay/block_cache_tracer.cc",
"trace_replay/io_tracer.cc",
+ "trace_replay/trace_record.cc",
+ "trace_replay/trace_record_handler.cc",
"trace_replay/trace_replay.cc",
"util/build_version.cc",
"util/coding.cc",
"utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc",
+ "utilities/trace/replayer_impl.cc",
"utilities/transactions/lock/lock_manager.cc",
"utilities/transactions/lock/point/point_lock_manager.cc",
"utilities/transactions/lock/point/point_lock_tracker.cc",
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "test_util/sync_point.h"
+#include "trace_replay/trace_replay.h"
#include "util/autovector.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
+#include "utilities/trace/replayer_impl.h"
namespace ROCKSDB_NAMESPACE {
return earliest_seq;
}
-#endif // ROCKSDB_LITE
-#ifndef ROCKSDB_LITE
Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only,
SequenceNumber lower_bound_seq,
return s;
}
+Status DBImpl::NewDefaultReplayer(
+ const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader,
+ std::unique_ptr<Replayer>* replayer) {
+ replayer->reset(new ReplayerImpl(this, handles, std::move(reader)));
+ return Status::OK();
+}
+
Status DBImpl::StartBlockCacheTrace(
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/status.h"
+#ifndef ROCKSDB_LITE
#include "rocksdb/trace_reader_writer.h"
+#endif // ROCKSDB_LITE
#include "rocksdb/transaction_log.h"
+#ifndef ROCKSDB_LITE
+#include "rocksdb/utilities/replayer.h"
+#endif // ROCKSDB_LITE
#include "rocksdb/write_buffer_manager.h"
#include "table/merging_iterator.h"
#include "table/scoped_arena_iterator.h"
using DB::EndTrace;
virtual Status EndTrace() override;
+ using DB::NewDefaultReplayer;
+ virtual Status NewDefaultReplayer(
+ const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader,
+ std::unique_ptr<Replayer>* replayer) override;
+
using DB::StartBlockCacheTrace;
Status StartBlockCacheTrace(
const TraceOptions& options,
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h"
+#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ // Unprepared replay should fail with Status::Incomplete()
+ ASSERT_TRUE(replayer->Replay().IsIncomplete());
+ ASSERT_OK(replayer->Prepare());
+ // Ok to repeatedly Prepare().
+ ASSERT_OK(replayer->Prepare());
+ // Replay using 1 thread, 1x speed.
+ ASSERT_OK(replayer->Replay());
+
+ ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
+ ASSERT_EQ("1", value);
+ ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
+ ASSERT_EQ("12", value);
+ ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
+
+ ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
+ ASSERT_EQ("bar", value);
+ ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
+ ASSERT_EQ("rocks", value);
+
+ // Re-replay should fail with Status::Incomplete() if Prepare() was not
+ // called. Currently we don't distinguish between unprepared and trace end.
+ ASSERT_TRUE(replayer->Replay().IsIncomplete());
+
+ // Re-replay using 2 threads, 2x speed.
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0)));
+
+ // Re-replay using 2 threads, 1/2 speed.
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5)));
+ replayer.reset();
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+ delete db2;
+ ASSERT_OK(DestroyDB(dbname2, options));
+}
+
+TEST_F(DBTest2, TraceAndManualReplay) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ ReadOptions ro;
+ WriteOptions wo;
+ TraceOptions trace_opts;
+ EnvOptions env_opts;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+ Iterator* single_iter = nullptr;
+
+ ASSERT_TRUE(db_->EndTrace().IsIOError());
+
+ std::string trace_filename = dbname_ + "/rocksdb.trace";
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+ ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+
+ ASSERT_OK(Put(0, "a", "1"));
+ ASSERT_OK(Merge(0, "b", "2"));
+ ASSERT_OK(Delete(0, "c"));
+ ASSERT_OK(SingleDelete(0, "d"));
+ ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
+
+ WriteBatch batch;
+ ASSERT_OK(batch.Put("f", "11"));
+ ASSERT_OK(batch.Merge("g", "12"));
+ ASSERT_OK(batch.Delete("h"));
+ ASSERT_OK(batch.SingleDelete("i"));
+ ASSERT_OK(batch.DeleteRange("j", "k"));
+ ASSERT_OK(db_->Write(wo, &batch));
+
+ single_iter = db_->NewIterator(ro);
+ single_iter->Seek("f");
+ single_iter->SeekForPrev("g");
+ delete single_iter;
+
+ ASSERT_EQ("1", Get(0, "a"));
+ ASSERT_EQ("12", Get(0, "g"));
+
+ ASSERT_OK(Put(1, "foo", "bar"));
+ ASSERT_OK(Put(1, "rocksdb", "rocks"));
+ ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+
+ ASSERT_OK(db_->EndTrace());
+ // These should not get into the trace file as it is after EndTrace.
+ Put("hello", "world");
+ Merge("foo", "bar");
+
+ // Open another db, replay, and verify the data
+ std::string value;
+ std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay");
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ // Using a different name than db2, to pacify infer's use-after-lifetime
+ // warnings (http://fbinfer.com).
+ DB* db2_init = nullptr;
+ options.create_if_missing = true;
+ ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+ ColumnFamilyHandle* cf;
+ ASSERT_OK(
+ db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+ delete cf;
+ delete db2_init;
+
+ DB* db2 = nullptr;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ ColumnFamilyOptions cf_options;
+ cf_options.merge_operator = MergeOperators::CreatePutOperator();
+ column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+ column_families.push_back(
+ ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+ std::vector<ColumnFamilyHandle*> handles;
+ DBOptions db_opts;
+ db_opts.env = env_;
+ ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
+
+ env_->SleepForMicroseconds(100);
+ // Verify that the keys don't already exist
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
+
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+ // Manual replay for 2 times. The 2nd checks if the replay can restart.
+ std::unique_ptr<TraceRecord> record;
+ for (int i = 0; i < 2; i++) {
+ // Next should fail if unprepared.
+ ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+ ASSERT_OK(replayer->Prepare());
+ Status s = Status::OK();
+ // Looping until trace end.
+ while (s.ok()) {
+ s = replayer->Next(&record);
+ // Skip unsupported operations.
+ if (s.IsNotSupported()) {
+ continue;
+ }
+ if (s.ok()) {
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ }
+ }
+ // Status::Incomplete() will be returned when manually reading the trace
+ // end, or Prepare() was not called.
+ ASSERT_TRUE(s.IsIncomplete());
+ ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+ }
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value);
ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
ASSERT_EQ("rocks", value);
+ // Test execution of artificially created TraceRecords.
+ uint64_t fake_ts = 1U;
+ // Write
+ batch.Clear();
+ batch.Put("trace-record-write1", "write1");
+ batch.Put("trace-record-write2", "write2");
+ record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
+ ASSERT_EQ("write1", value);
+ ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
+ ASSERT_EQ("write2", value);
+
+ // Get related
+ // Get an existing key.
+ record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
+ "trace-record-write1", fake_ts++));
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ // Get an non-existing key, should still return Status::OK().
+ record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
+ fake_ts++));
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ // Get from an invalid (non-existing) cf_id.
+ uint32_t invalid_cf_id = handles[1]->GetID() + 1;
+ record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
+ ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+
+ // Iteration related
+ for (IteratorSeekQueryTraceRecord::SeekType seekType :
+ {IteratorSeekQueryTraceRecord::kSeek,
+ IteratorSeekQueryTraceRecord::kSeekForPrev}) {
+ // Seek to an existing key.
+ record.reset(new IteratorSeekQueryTraceRecord(
+ seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ // Seek to an non-existing key, should still return Status::OK().
+ record.reset(new IteratorSeekQueryTraceRecord(
+ seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ // Seek from an invalid cf_id.
+ record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
+ "whatever", fake_ts++));
+ ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+ }
+
+ // MultiGet related
+ // Get existing keys.
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"a", "foo"}), fake_ts++));
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ // Get all non-existing keys, should still return Status::OK().
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"no1", "no2"}), fake_ts++));
+ // Get mixed of existing and non-existing keys, should still return
+ // Status::OK().
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"a", "no2"}), fake_ts++));
+ ASSERT_OK(replayer->Execute(std::move(record)));
+ // Get from an invalid (non-existing) cf_id.
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>(
+ {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
+ std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
+ ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+ // Empty MultiGet
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
+ ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+ // MultiGet size mismatch
+ record.reset(new MultiGetQueryTraceRecord(
+ std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+ std::vector<std::string>({"a"}), fake_ts++));
+ ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+
+ replayer.reset();
+
for (auto handle : handles) {
delete handle;
}
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay());
+ replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay());
+ replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
- Replayer replayer(db2, handles_, std::move(trace_reader));
- ASSERT_OK(replayer.Replay());
+ std::unique_ptr<Replayer> replayer;
+ ASSERT_OK(
+ db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ ASSERT_OK(replayer->Prepare());
+ ASSERT_OK(replayer->Replay());
+ replayer.reset();
// All the key-values should not present since we filter out the WRITE ops.
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
#include "rocksdb/file_system.h"
#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE {
namespace ROCKSDB_NAMESPACE {
-struct Options;
-struct DBOptions;
struct ColumnFamilyOptions;
-struct ReadOptions;
-struct WriteOptions;
-struct FlushOptions;
struct CompactionOptions;
struct CompactRangeOptions;
-struct TableProperties;
+struct DBOptions;
struct ExternalSstFileInfo;
-class WriteBatch;
+struct FlushOptions;
+struct Options;
+struct ReadOptions;
+struct TableProperties;
+struct WriteOptions;
+#ifdef ROCKSDB_LITE
+class CompactionJobInfo;
+#endif
class Env;
class EventListener;
+class FileSystem;
+#ifndef ROCKSDB_LITE
+class Replayer;
+#endif
class StatsHistoryIterator;
+#ifndef ROCKSDB_LITE
+class TraceReader;
class TraceWriter;
-#ifdef ROCKSDB_LITE
-class CompactionJobInfo;
#endif
-class FileSystem;
+class WriteBatch;
extern const std::string kDefaultColumnFamilyName;
extern const std::string kPersistentStatsColumnFamilyName;
virtual ColumnFamilyHandle* DefaultColumnFamily() const = 0;
#ifndef ROCKSDB_LITE
+
virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) = 0;
virtual Status GetPropertiesOfAllTables(TablePropertiesCollection* props) {
virtual Status EndBlockCacheTrace() {
return Status::NotSupported("EndBlockCacheTrace() is not implemented.");
}
+
+ // Create a default trace replayer.
+ virtual Status NewDefaultReplayer(
+ const std::vector<ColumnFamilyHandle*>& /*handles*/,
+ std::unique_ptr<TraceReader>&& /*reader*/,
+ std::unique_ptr<Replayer>* /*replayer*/) {
+ return Status::NotSupported("NewDefaultReplayer() is not implemented.");
+ }
+
#endif // ROCKSDB_LITE
// Needed for StackableDB
virtual Status Read(std::string* data) = 0;
virtual Status Close() = 0;
+
+ // Seek back to the trace header. Replayer can call this method for
+ // repeatedly replaying. Note this method may fail if the reader is already
+ // closed.
+ virtual Status Reset() = 0;
};
// Factory methods to read/write traces from/to a file.
Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
const std::string& trace_filename,
std::unique_ptr<TraceReader>* trace_reader);
+
} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/slice.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ColumnFamilyHandle;
+class DB;
+class Status;
+
+// Supported trace record types.
+enum TraceType : char {
+ kTraceNone = 0,
+ kTraceBegin = 1,
+ kTraceEnd = 2,
+ // Query level tracing related trace types.
+ kTraceWrite = 3,
+ kTraceGet = 4,
+ kTraceIteratorSeek = 5,
+ kTraceIteratorSeekForPrev = 6,
+ // Block cache tracing related trace types.
+ kBlockTraceIndexBlock = 7,
+ kBlockTraceFilterBlock = 8,
+ kBlockTraceDataBlock = 9,
+ kBlockTraceUncompressionDictBlock = 10,
+ kBlockTraceRangeDeletionBlock = 11,
+ // IO tracing related trace type.
+ kIOTracer = 12,
+ // Query level tracing related trace type.
+ kTraceMultiGet = 13,
+ // All trace types should be added before kTraceMax
+ kTraceMax,
+};
+
+class WriteQueryTraceRecord;
+class GetQueryTraceRecord;
+class IteratorSeekQueryTraceRecord;
+class MultiGetQueryTraceRecord;
+
+// Base class for all types of trace records.
+class TraceRecord {
+ public:
+ TraceRecord();
+ explicit TraceRecord(uint64_t timestamp);
+ virtual ~TraceRecord();
+
+ virtual TraceType GetTraceType() const = 0;
+
+ virtual uint64_t GetTimestamp() const;
+
+ class Handler {
+ public:
+ virtual ~Handler() {}
+
+ virtual Status Handle(const WriteQueryTraceRecord& record) = 0;
+ virtual Status Handle(const GetQueryTraceRecord& record) = 0;
+ virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0;
+ virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0;
+ };
+
+ virtual Status Accept(Handler* handler) = 0;
+
+ // Create a handler for the exeution of TraceRecord.
+ static Handler* NewExecutionHandler(
+ DB* db, const std::vector<ColumnFamilyHandle*>& handles);
+
+ private:
+ // Timestamp (in microseconds) of this trace.
+ uint64_t timestamp_;
+};
+
+// Base class for all query types of trace records.
+class QueryTraceRecord : public TraceRecord {
+ public:
+ explicit QueryTraceRecord(uint64_t timestamp);
+
+ virtual ~QueryTraceRecord() override;
+};
+
+// Trace record for DB::Write() operation.
+class WriteQueryTraceRecord : public QueryTraceRecord {
+ public:
+ WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, uint64_t timestamp);
+
+ WriteQueryTraceRecord(const std::string& write_batch_rep, uint64_t timestamp);
+
+ virtual ~WriteQueryTraceRecord() override;
+
+ TraceType GetTraceType() const override { return kTraceWrite; };
+
+ virtual Slice GetWriteBatchRep() const;
+
+ virtual Status Accept(Handler* handler) override;
+
+ private:
+ PinnableSlice rep_;
+};
+
+// Trace record for DB::Get() operation
+class GetQueryTraceRecord : public QueryTraceRecord {
+ public:
+ GetQueryTraceRecord(uint32_t column_family_id, PinnableSlice&& key,
+ uint64_t timestamp);
+
+ GetQueryTraceRecord(uint32_t column_family_id, const std::string& key,
+ uint64_t timestamp);
+
+ virtual ~GetQueryTraceRecord() override;
+
+ TraceType GetTraceType() const override { return kTraceGet; };
+
+ virtual uint32_t GetColumnFamilyID() const;
+
+ virtual Slice GetKey() const;
+
+ virtual Status Accept(Handler* handler) override;
+
+ private:
+ // Column family ID.
+ uint32_t cf_id_;
+ // Key to get.
+ PinnableSlice key_;
+};
+
+// Base class for all Iterator related operations.
+class IteratorQueryTraceRecord : public QueryTraceRecord {
+ public:
+ explicit IteratorQueryTraceRecord(uint64_t timestamp);
+
+ virtual ~IteratorQueryTraceRecord() override;
+};
+
+// Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation.
+class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord {
+ public:
+ // Currently we only support Seek() and SeekForPrev().
+ enum SeekType {
+ kSeek = kTraceIteratorSeek,
+ kSeekForPrev = kTraceIteratorSeekForPrev
+ };
+
+ IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
+ PinnableSlice&& key, uint64_t timestamp);
+
+ IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
+ const std::string& key, uint64_t timestamp);
+
+ virtual ~IteratorSeekQueryTraceRecord() override;
+
+ TraceType GetTraceType() const override;
+
+ virtual SeekType GetSeekType() const;
+
+ virtual uint32_t GetColumnFamilyID() const;
+
+ virtual Slice GetKey() const;
+
+ virtual Status Accept(Handler* handler) override;
+
+ private:
+ SeekType type_;
+ // Column family ID.
+ uint32_t cf_id_;
+ // Key to seek to.
+ PinnableSlice key_;
+};
+
+// Trace record for DB::MultiGet() operation.
+class MultiGetQueryTraceRecord : public QueryTraceRecord {
+ public:
+ MultiGetQueryTraceRecord(std::vector<uint32_t> column_family_ids,
+ std::vector<PinnableSlice>&& keys,
+ uint64_t timestamp);
+
+ MultiGetQueryTraceRecord(std::vector<uint32_t> column_family_ids,
+ const std::vector<std::string>& keys,
+ uint64_t timestamp);
+
+ virtual ~MultiGetQueryTraceRecord() override;
+
+ TraceType GetTraceType() const override { return kTraceMultiGet; };
+
+ virtual std::vector<uint32_t> GetColumnFamilyIDs() const;
+
+ virtual std::vector<Slice> GetKeys() const;
+
+ virtual Status Accept(Handler* handler) override;
+
+ private:
+ // Column familiy IDs.
+ std::vector<uint32_t> cf_ids_;
+ // Keys to get.
+ std::vector<PinnableSlice> keys_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <memory>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+struct ReplayOptions {
+ // Number of threads used for replaying. If 0 or 1, replay using
+ // single thread.
+ uint32_t num_threads;
+
+ // Enables fast forwarding a replay by increasing/reducing the delay between
+ // the ingested traces.
+ // If > 0.0 and < 1.0, slow down the replay by this amount.
+ // If 1.0, replay the operations at the same rate as in the trace stream.
+ // If > 1, speed up the replay by this amount.
+ double fast_forward;
+
+ ReplayOptions() : num_threads(1), fast_forward(1.0) {}
+ ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio)
+ : num_threads(num_of_threads), fast_forward(fast_forward_ratio) {}
+};
+
+// Replayer helps to replay the captured RocksDB query level operations.
+// The Replayer can either be created from DB::NewReplayer method, or be
+// instantiated via db_bench today, on using "replay" benchmark.
+class Replayer {
+ public:
+ virtual ~Replayer() {}
+
+ // Make some preparation before replaying the trace. This will also reset the
+ // replayer in order to restart replaying.
+ virtual Status Prepare() = 0;
+
+ // Return the timestamp when the trace recording was started.
+ virtual uint64_t GetHeaderTimestamp() const = 0;
+
+ // Atomically read one trace into a TraceRecord (excluding the header and
+ // footer traces).
+ // Return Status::OK() on success;
+ // Status::Incomplete() if Prepare() was not called or no more available
+ // trace;
+ // Status::NotSupported() if the read trace type is not supported.
+ virtual Status Next(std::unique_ptr<TraceRecord>* record) = 0;
+
+ // Execute one TraceRecord.
+ // Return Status::OK() if the execution was successful. Get/MultiGet traces
+ // will still return Status::OK() even if they got Status::NotFound()
+ // from DB::Get() or DB::MultiGet();
+ // Status::Incomplete() if Prepare() was not called or no more available
+ // trace;
+ // Status::NotSupported() if the operation is not supported;
+ // Otherwise, return the corresponding error status.
+ virtual Status Execute(const std::unique_ptr<TraceRecord>& record) = 0;
+ virtual Status Execute(std::unique_ptr<TraceRecord>&& record) = 0;
+
+ // Replay all the traces from the provided trace stream, taking the delay
+ // between the traces into consideration.
+ virtual Status Replay(const ReplayOptions& options) = 0;
+ virtual Status Replay() { return Replay(ReplayOptions()); }
+};
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
using DB::EndTrace;
Status EndTrace() override { return db_->EndTrace(); }
+ using DB::NewDefaultReplayer;
+ Status NewDefaultReplayer(const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader,
+ std::unique_ptr<Replayer>* replayer) override {
+ return db_->NewDefaultReplayer(handles, std::move(reader), replayer);
+ }
+
#endif // ROCKSDB_LITE
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
test_util/sync_point_impl.cc \
test_util/transaction_test_util.cc \
tools/dump/db_dump_tool.cc \
+ trace_replay/trace_record_handler.cc \
+ trace_replay/trace_record.cc \
trace_replay/trace_replay.cc \
trace_replay/block_cache_tracer.cc \
trace_replay/io_tracer.cc \
utilities/simulator_cache/sim_cache.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/trace/file_trace_reader_writer.cc \
+ utilities/trace/replayer_impl.cc \
utilities/transactions/lock/lock_manager.cc \
utilities/transactions/lock/point/point_lock_tracker.cc \
utilities/transactions/lock/point/point_lock_manager.cc \
#include "rocksdb/system_clock.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
+#include "rocksdb/trace_record.h"
#include "table/block_based/binary_search_index_reader.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_filter_block.h"
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <stdio.h>
+
#include <algorithm>
#include <iostream>
#include <map>
#include "rocksdb/perf_context.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
+#include "rocksdb/trace_record.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_builder.h"
#include "monitoring/histogram.h"
#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_record.h"
#include "util/gflags_compat.h"
#include "util/string_util.h"
#include "db/dbformat.h"
#include "rocksdb/env.h"
+#include "rocksdb/trace_record.h"
#include "rocksdb/utilities/sim_cache.h"
#include "trace_replay/block_cache_tracer.h"
#include "utilities/simulator_cache/cache_simulator.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/options_type.h"
#include "rocksdb/utilities/options_util.h"
+#ifndef ROCKSDB_LITE
+#include "rocksdb/utilities/replayer.h"
+#endif // ROCKSDB_LITE
#include "rocksdb/utilities/sim_cache.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
"\tmemstats -- Print memtable stats\n"
"\tsstables -- Print sstable info\n"
"\theapprofile -- Dump a heap profile (if supported by this port)\n"
+#ifndef ROCKSDB_LITE
"\treplay -- replay the trace file specified with trace_file\n"
+#endif // ROCKSDB_LITE
"\tgetmergeoperands -- Insert lots of merge records which are a list of "
"sorted ints for a key and then compare performance of lookup for another "
"key "
DEFINE_bool(use_stderr_info_logger, false,
"Write info logs to stderr instead of to LOG file. ");
+#ifndef ROCKSDB_LITE
+
DEFINE_string(trace_file, "", "Trace workload to a file. ");
-DEFINE_int32(trace_replay_fast_forward, 1,
- "Fast forward trace replay, must >= 1. ");
+DEFINE_double(trace_replay_fast_forward, 1.0,
+ "Fast forward trace replay, must > 0.0.");
DEFINE_int32(block_cache_trace_sampling_frequency, 1,
"Block cache trace sampling frequency, termed s. It uses spatial "
"downsampling and samples accesses to one out of s blocks.");
DEFINE_int32(trace_replay_threads, 1,
"The number of threads to replay, must >=1.");
+#endif // ROCKSDB_LITE
+
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) {
assert(ctype);
PrintStats("rocksdb.sstables");
} else if (name == "stats_history") {
PrintStatsHistory();
+#ifndef ROCKSDB_LITE
} else if (name == "replay") {
if (num_threads > 1) {
fprintf(stderr, "Multi-threaded replay is not yet supported\n");
ErrorExit();
}
method = &Benchmark::Replay;
+#endif // ROCKSDB_LITE
} else if (name == "getmergeoperands") {
method = &Benchmark::GetMergeOperands;
} else if (!name.empty()) { // No error message for empty name
}
}
+#ifndef ROCKSDB_LITE
+
void Replay(ThreadState* thread) {
if (db_.db != nullptr) {
Replay(thread, &db_);
s.ToString().c_str());
exit(1);
}
- Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
- std::move(trace_reader));
- replayer.SetFastForward(
- static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
- s = replayer.MultiThreadReplay(
- static_cast<uint32_t>(FLAGS_trace_replay_threads));
+ std::unique_ptr<Replayer> replayer;
+ s = db_with_cfh->db->NewDefaultReplayer(db_with_cfh->cfh,
+ std::move(trace_reader), &replayer);
+ if (!s.ok()) {
+ fprintf(stderr,
+ "Encountered an error creating a default Replayer. "
+ "Error: %s\n",
+ s.ToString().c_str());
+ exit(1);
+ }
+ s = replayer->Prepare();
+ if (!s.ok()) {
+ fprintf(stderr, "Prepare for replay failed. Error: %s\n",
+ s.ToString().c_str());
+ }
+ s = replayer->Replay(
+ ReplayOptions(static_cast<uint32_t>(FLAGS_trace_replay_threads),
+ FLAGS_trace_replay_fast_forward));
+ replayer.reset();
if (s.ok()) {
- fprintf(stdout, "Replay started from trace_file: %s\n",
+ fprintf(stdout, "Replay completed from trace_file: %s\n",
FLAGS_trace_file.c_str());
} else {
- fprintf(stderr, "Starting replay failed. Error: %s\n",
- s.ToString().c_str());
+ fprintf(stderr, "Replay failed. Error: %s\n", s.ToString().c_str());
}
}
+
+#endif // ROCKSDB_LITE
};
int db_bench_tool(int argc, char** argv) {
return (op1 * op2);
}
-void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) {
- Slice buf(buffer);
- GetFixed32(&buf, cf_id);
- GetLengthPrefixedSlice(&buf, key);
-}
-
} // namespace
// The default constructor of AnalyzerOptions
total_requests_++;
end_time_ = trace.ts;
- if (trace.type == kTraceWrite) {
- total_writes_++;
- c_time_ = trace.ts;
- Slice batch_data;
- if (trace_file_version_ < 2) {
- Slice tmp_data(trace.payload);
- batch_data = tmp_data;
- } else {
- WritePayload w_payload;
- TracerHelper::DecodeWritePayload(&trace, &w_payload);
- batch_data = w_payload.write_batch_data;
- }
- // Note that, if the write happens in a transaction,
- // 'Write' will be called twice, one for Prepare, one for
- // Commit. Thus, in the trace, for the same WriteBatch, there
- // will be two reords if it is in a transaction. Here, we only
- // process the reord that is committed. If write is non-transaction,
- // HasBeginPrepare()==false, so we process it normally.
- WriteBatch batch(batch_data.ToString());
- if (batch.HasBeginPrepare() && !batch.HasCommit()) {
- continue;
- }
- TraceWriteHandler write_handler(this);
- s = batch.Iterate(&write_handler);
- if (!s.ok()) {
- fprintf(stderr, "Cannot process the write batch in the trace\n");
- return s;
+ if (trace.type == kTraceEnd) {
+ break;
+ }
+
+ std::unique_ptr<TraceRecord> record;
+ switch (trace.type) {
+ case kTraceWrite: {
+ s = TracerHelper::DecodeWriteRecord(&trace, trace_file_version_,
+ &record);
+ if (!s.ok()) {
+ return s;
+ }
+ total_writes_++;
+ c_time_ = trace.ts;
+ std::unique_ptr<WriteQueryTraceRecord> r(
+ reinterpret_cast<WriteQueryTraceRecord*>(record.release()));
+ // Note that, if the write happens in a transaction,
+ // 'Write' will be called twice, one for Prepare, one for
+ // Commit. Thus, in the trace, for the same WriteBatch, there
+ // will be two reords if it is in a transaction. Here, we only
+ // process the reord that is committed. If write is non-transaction,
+ // HasBeginPrepare()==false, so we process it normally.
+ WriteBatch batch(r->GetWriteBatchRep().ToString());
+ if (batch.HasBeginPrepare() && !batch.HasCommit()) {
+ continue;
+ }
+ TraceWriteHandler write_handler(this);
+ s = batch.Iterate(&write_handler);
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot process the write batch in the trace\n");
+ return s;
+ }
+ break;
}
- } else if (trace.type == kTraceGet) {
- GetPayload get_payload;
- get_payload.get_key = 0;
- if (trace_file_version_ < 2) {
- DecodeCFAndKeyFromString(trace.payload, &get_payload.cf_id,
- &get_payload.get_key);
- } else {
- TracerHelper::DecodeGetPayload(&trace, &get_payload);
+ case kTraceGet: {
+ s = TracerHelper::DecodeGetRecord(&trace, trace_file_version_, &record);
+ if (!s.ok()) {
+ return s;
+ }
+ total_gets_++;
+ std::unique_ptr<GetQueryTraceRecord> r(
+ reinterpret_cast<GetQueryTraceRecord*>(record.release()));
+ s = HandleGet(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
+ 1);
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot process the get in the trace\n");
+ return s;
+ }
+ break;
}
- total_gets_++;
-
- s = HandleGet(get_payload.cf_id, get_payload.get_key.ToString(), trace.ts,
- 1);
- if (!s.ok()) {
- fprintf(stderr, "Cannot process the get in the trace\n");
- return s;
+ case kTraceIteratorSeek:
+ case kTraceIteratorSeekForPrev: {
+ s = TracerHelper::DecodeIterRecord(&trace, trace_file_version_,
+ &record);
+ if (!s.ok()) {
+ return s;
+ }
+ std::unique_ptr<IteratorSeekQueryTraceRecord> r(
+ reinterpret_cast<IteratorSeekQueryTraceRecord*>(record.release()));
+ s = HandleIter(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
+ r->GetTraceType());
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot process the iterator in the trace\n");
+ return s;
+ }
+ break;
}
- } else if (trace.type == kTraceIteratorSeek ||
- trace.type == kTraceIteratorSeekForPrev) {
- IterPayload iter_payload;
- iter_payload.cf_id = 0;
- if (trace_file_version_ < 2) {
- DecodeCFAndKeyFromString(trace.payload, &iter_payload.cf_id,
- &iter_payload.iter_key);
- } else {
- TracerHelper::DecodeIterPayload(&trace, &iter_payload);
+ case kTraceMultiGet: {
+ s = TracerHelper::DecodeMultiGetRecord(&trace, trace_file_version_,
+ &record);
+ if (!s.ok()) {
+ return s;
+ }
+ std::unique_ptr<MultiGetQueryTraceRecord> r(
+ reinterpret_cast<MultiGetQueryTraceRecord*>(record.release()));
+ s = HandleMultiGet(r->GetColumnFamilyIDs(), r->GetKeys(),
+ r->GetTimestamp());
+ break;
}
- s = HandleIter(iter_payload.cf_id, iter_payload.iter_key.ToString(),
- trace.ts, trace.type);
- if (!s.ok()) {
- fprintf(stderr, "Cannot process the iterator in the trace\n");
- return s;
+ default: {
+ // Skip unsupported types
+ break;
}
- } else if (trace.type == kTraceMultiGet) {
- MultiGetPayload multiget_payload;
- assert(trace_file_version_ >= 2);
- TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
- s = HandleMultiGet(multiget_payload, trace.ts);
- } else if (trace.type == kTraceEnd) {
- break;
}
}
if (s.IsIncomplete()) {
// Process the statistics of QPS
Status TraceAnalyzer::MakeStatisticQPS() {
- if(begin_time_ == 0) {
+ if (begin_time_ == 0) {
begin_time_ = trace_create_time_;
}
uint32_t duration =
}
// Handle the Get request in the trace
-Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
- const std::string& key, const uint64_t& ts,
- const uint32_t& get_ret) {
+Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key,
+ const uint64_t& ts, const uint32_t& get_ret) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
if (get_ret == 1) {
value_size = 10;
}
- s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key,
- value_size, ts);
+ s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id,
+ key.ToString(), value_size, ts);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
}
// Handle the Iterator request in the trace
-Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
- const std::string& key, const uint64_t& ts,
- TraceType& trace_type) {
+Status TraceAnalyzer::HandleIter(uint32_t column_family_id, const Slice& key,
+ const uint64_t& ts, TraceType trace_type) {
Status s;
size_t value_size = 0;
int type = -1;
if (!ta_[type].enabled) {
return Status::OK();
}
- s = KeyStatsInsertion(type, column_family_id, key, value_size, ts);
+ s = KeyStatsInsertion(type, column_family_id, key.ToString(), value_size, ts);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
}
// Handle MultiGet queries in the trace
-Status TraceAnalyzer::HandleMultiGet(MultiGetPayload& multiget_payload,
- const uint64_t& ts) {
+Status TraceAnalyzer::HandleMultiGet(
+ const std::vector<uint32_t>& column_family_ids,
+ const std::vector<Slice>& keys, const uint64_t& ts) {
Status s;
size_t value_size = 0;
- if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
+ if (column_family_ids.size() != keys.size()) {
// The size does not match is not the error of tracing and anayzing, we just
// report it to the user. The analyzing continues.
printf("The CF ID vector size does not match the keys vector size!\n");
}
- size_t vector_size = std::min(multiget_payload.cf_ids.size(),
- multiget_payload.multiget_keys.size());
+ size_t vector_size = std::min(column_family_ids.size(), keys.size());
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
for (size_t i = 0; i < vector_size; i++) {
- assert(i < multiget_payload.cf_ids.size() &&
- i < multiget_payload.multiget_keys.size());
+ assert(i < column_family_ids.size() && i < keys.size());
s = WriteTraceSequence(TraceOperationType::kMultiGet,
- multiget_payload.cf_ids[i],
- multiget_payload.multiget_keys[i], value_size, ts);
+ column_family_ids[i], keys[i], value_size, ts);
}
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
return Status::OK();
}
for (size_t i = 0; i < vector_size; i++) {
- assert(i < multiget_payload.cf_ids.size() &&
- i < multiget_payload.multiget_keys.size());
- s = KeyStatsInsertion(TraceOperationType::kMultiGet,
- multiget_payload.cf_ids[i],
- multiget_payload.multiget_keys[i], value_size, ts);
+ assert(i < column_family_ids.size() && i < keys.size());
+ s = KeyStatsInsertion(TraceOperationType::kMultiGet, column_family_ids[i],
+ keys[i].ToString(), value_size, ts);
}
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
// Write the trace sequence to file
Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
const uint32_t& cf_id,
- const std::string& key,
+ const Slice& key,
const size_t value_size,
const uint64_t ts) {
- std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key);
+ std::string hex_key =
+ ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key.ToString());
int ret;
ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type,
cf_id, value_size, ts);
#include "rocksdb/env.h"
#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
#include "rocksdb/write_batch.h"
#include "trace_replay/trace_replay.h"
Status WriteTraceUnit(TraceUnit& unit);
// The trace processing functions for different type
- Status HandleGet(uint32_t column_family_id, const std::string& key,
+ Status HandleGet(uint32_t column_family_id, const Slice& key,
const uint64_t& ts, const uint32_t& get_ret);
Status HandlePut(uint32_t column_family_id, const Slice& key,
const Slice& value);
const Slice& end_key);
Status HandleMerge(uint32_t column_family_id, const Slice& key,
const Slice& value);
- Status HandleIter(uint32_t column_family_id, const std::string& key,
- const uint64_t& ts, TraceType& trace_type);
- Status HandleMultiGet(MultiGetPayload& multiget_payload, const uint64_t& ts);
+ Status HandleIter(uint32_t column_family_id, const Slice& key,
+ const uint64_t& ts, TraceType trace_type);
+ Status HandleMultiGet(const std::vector<uint32_t>& column_family_ids,
+ const std::vector<Slice>& keys, const uint64_t& ts);
std::vector<TypeUnit>& GetTaVector() { return ta_; }
private:
Status TraceUnitWriter(
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
- const std::string& key, const size_t value_size,
+ const Slice& key, const size_t value_size,
const uint64_t ts);
Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "rocksdb/slice.h"
+#include "rocksdb/trace_record.h"
#include "util/coding.h"
#include "util/hash.h"
#include "util/string_util.h"
// (found in the LICENSE.Apache file in the root directory).
#include "trace_replay/block_cache_tracer.h"
+
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "port/lang.h"
#include "rocksdb/file_system.h"
#include "rocksdb/options.h"
+#include "rocksdb/trace_record.h"
#include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE {
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/trace_record.h"
+
+#include <utility>
+
+#include "rocksdb/db.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/status.h"
+#include "trace_replay/trace_record_handler.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceRecord
+TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {}
+
+TraceRecord::~TraceRecord() {}
+
+uint64_t TraceRecord::GetTimestamp() const { return timestamp_; }
+
+TraceRecord::Handler* TraceRecord::NewExecutionHandler(
+ DB* db, const std::vector<ColumnFamilyHandle*>& handles) {
+ return new TraceExecutionHandler(db, handles);
+}
+
+// QueryTraceRecord
+QueryTraceRecord::QueryTraceRecord(uint64_t timestamp)
+ : TraceRecord(timestamp) {}
+
+QueryTraceRecord::~QueryTraceRecord() {}
+
+// WriteQueryTraceRecord
+WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp), rep_(std::move(write_batch_rep)) {}
+
+WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp) {
+ rep_.PinSelf(write_batch_rep);
+}
+
+WriteQueryTraceRecord::~WriteQueryTraceRecord() {}
+
+Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); }
+
+Status WriteQueryTraceRecord::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// GetQueryTraceRecord
+GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
+ PinnableSlice&& key,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp),
+ cf_id_(column_family_id),
+ key_(std::move(key)) {}
+
+GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
+ const std::string& key,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp), cf_id_(column_family_id) {
+ key_.PinSelf(key);
+}
+
+GetQueryTraceRecord::~GetQueryTraceRecord() {}
+
+uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; }
+
+Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); }
+
+Status GetQueryTraceRecord::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// IteratorQueryTraceRecord
+IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
+ : QueryTraceRecord(timestamp) {}
+
+IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
+
+// IteratorSeekQueryTraceRecord
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+ SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
+ uint64_t timestamp)
+ : IteratorQueryTraceRecord(timestamp),
+ type_(seek_type),
+ cf_id_(column_family_id),
+ key_(std::move(key)) {}
+
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+ SeekType seek_type, uint32_t column_family_id, const std::string& key,
+ uint64_t timestamp)
+ : IteratorQueryTraceRecord(timestamp),
+ type_(seek_type),
+ cf_id_(column_family_id) {
+ key_.PinSelf(key);
+}
+
+IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {}
+
+TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {
+ return static_cast<TraceType>(type_);
+}
+
+IteratorSeekQueryTraceRecord::SeekType
+IteratorSeekQueryTraceRecord::GetSeekType() const {
+ return type_;
+}
+
+uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const {
+ return cf_id_;
+}
+
+Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); }
+
+Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// MultiGetQueryTraceRecord
+MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
+ std::vector<uint32_t> column_family_ids, std::vector<PinnableSlice>&& keys,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp),
+ cf_ids_(column_family_ids),
+ keys_(std::move(keys)) {}
+
+MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
+ std::vector<uint32_t> column_family_ids,
+ const std::vector<std::string>& keys, uint64_t timestamp)
+ : QueryTraceRecord(timestamp), cf_ids_(column_family_ids) {
+ keys_.reserve(keys.size());
+ for (const std::string& key : keys) {
+ PinnableSlice ps;
+ ps.PinSelf(key);
+ keys_.push_back(std::move(ps));
+ }
+}
+
+MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {}
+
+std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const {
+ return cf_ids_;
+}
+
+std::vector<Slice> MultiGetQueryTraceRecord::GetKeys() const {
+ return std::vector<Slice>(keys_.begin(), keys_.end());
+}
+
+Status MultiGetQueryTraceRecord::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/trace_record_handler.h"
+
+#include "rocksdb/iterator.h"
+#include "rocksdb/write_batch.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceExecutionHandler
+TraceExecutionHandler::TraceExecutionHandler(
+ DB* db, const std::vector<ColumnFamilyHandle*>& handles)
+ : TraceRecord::Handler(),
+ db_(db),
+ write_opts_(WriteOptions()),
+ read_opts_(ReadOptions()) {
+ assert(db != nullptr);
+ assert(!handles.empty());
+ cf_map_.reserve(handles.size());
+ for (ColumnFamilyHandle* handle : handles) {
+ assert(handle != nullptr);
+ cf_map_.insert({handle->GetID(), handle});
+ }
+}
+
+TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
+
+Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) {
+ WriteBatch batch(record.GetWriteBatchRep().ToString());
+ return db_->Write(write_opts_, &batch);
+}
+
+Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) {
+ auto it = cf_map_.find(record.GetColumnFamilyID());
+ if (it == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+ assert(it->second != nullptr);
+
+ std::string value;
+ Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
+
+ // Treat not found as ok and return other errors.
+ return s.IsNotFound() ? Status::OK() : s;
+}
+
+Status TraceExecutionHandler::Handle(
+ const IteratorSeekQueryTraceRecord& record) {
+ auto it = cf_map_.find(record.GetColumnFamilyID());
+ if (it == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+ assert(it->second != nullptr);
+
+ Iterator* single_iter = db_->NewIterator(read_opts_, it->second);
+
+ switch (record.GetSeekType()) {
+ case IteratorSeekQueryTraceRecord::kSeekForPrev: {
+ single_iter->SeekForPrev(record.GetKey());
+ break;
+ }
+ default: {
+ single_iter->Seek(record.GetKey());
+ break;
+ }
+ }
+ Status s = single_iter->status();
+ delete single_iter;
+ return s;
+}
+
+Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
+ std::vector<ColumnFamilyHandle*> handles;
+ handles.reserve(record.GetColumnFamilyIDs().size());
+ for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
+ auto it = cf_map_.find(cf_id);
+ if (it == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+ assert(it->second != nullptr);
+ handles.push_back(it->second);
+ }
+
+ std::vector<Slice> keys = record.GetKeys();
+
+ if (handles.empty() || keys.empty()) {
+ return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+ }
+ if (handles.size() != keys.size()) {
+ return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
+ }
+
+ std::vector<std::string> values;
+ std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
+
+ // Treat not found as ok, return other errors.
+ for (Status s : ss) {
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <unordered_map>
+#include <vector>
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Handler to execute TraceRecord.
+class TraceExecutionHandler : public TraceRecord::Handler {
+ public:
+ TraceExecutionHandler(DB* db,
+ const std::vector<ColumnFamilyHandle*>& handles);
+ virtual ~TraceExecutionHandler() override;
+
+ virtual Status Handle(const WriteQueryTraceRecord& record) override;
+ virtual Status Handle(const GetQueryTraceRecord& record) override;
+ virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override;
+ virtual Status Handle(const MultiGetQueryTraceRecord& record) override;
+
+ private:
+ DB* db_;
+ std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
+ WriteOptions write_opts_;
+ ReadOptions read_opts_;
+};
+
+// To do: Handler for trace_analyzer.
+
+} // namespace ROCKSDB_NAMESPACE
#include "db/db_impl/db_impl.h"
#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
#include "util/string_util.h"
-#include "util/threadpool_imp.h"
namespace ROCKSDB_NAMESPACE {
return Status::OK();
}
+Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
+ Trace* header) {
+ Status s = TracerHelper::DecodeTrace(encoded_trace, header);
+
+ if (header->type != kTraceBegin) {
+ return Status::Corruption("Corrupted trace file. Incorrect header.");
+ }
+ if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
+ return Status::Corruption("Corrupted trace file. Incorrect magic.");
+ }
+
+ return s;
+}
+
bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
const TracePayloadType payload_type) {
uint64_t old_state = payload_map;
return old_state != payload_map;
}
-void TracerHelper::DecodeWritePayload(Trace* trace,
- WritePayload* write_payload) {
- assert(write_payload != nullptr);
- Slice buf(trace->payload);
- GetFixed64(&buf, &trace->payload_map);
- int64_t payload_map = static_cast<int64_t>(trace->payload_map);
- while (payload_map) {
- // Find the rightmost set bit.
- uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
- switch (set_pos) {
- case TracePayloadType::kWriteBatchData:
- GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data));
- break;
- default:
- assert(false);
+Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record) {
+ assert(trace != nullptr);
+ assert(trace->type == kTraceWrite);
+
+ PinnableSlice rep;
+ if (trace_file_version < 2) {
+ rep.PinSelf(trace->payload);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ Slice write_batch_data;
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kWriteBatchData:
+ GetLengthPrefixedSlice(&buf, &write_batch_data);
+ break;
+ default:
+ assert(false);
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
}
- // unset the rightmost bit.
- payload_map &= (payload_map - 1);
+ rep.PinSelf(write_batch_data);
+ }
+
+ if (record != nullptr) {
+ record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
}
+
+ return Status::OK();
}
-void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) {
- assert(get_payload != nullptr);
- Slice buf(trace->payload);
- GetFixed64(&buf, &trace->payload_map);
- int64_t payload_map = static_cast<int64_t>(trace->payload_map);
- while (payload_map) {
- // Find the rightmost set bit.
- uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
- switch (set_pos) {
- case TracePayloadType::kGetCFID:
- GetFixed32(&buf, &(get_payload->cf_id));
- break;
- case TracePayloadType::kGetKey:
- GetLengthPrefixedSlice(&buf, &(get_payload->get_key));
- break;
- default:
- assert(false);
+Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record) {
+ assert(trace != nullptr);
+ assert(trace->type == kTraceGet);
+
+ uint32_t cf_id = 0;
+ Slice get_key;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &get_key);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kGetCFID:
+ GetFixed32(&buf, &cf_id);
+ break;
+ case TracePayloadType::kGetKey:
+ GetLengthPrefixedSlice(&buf, &get_key);
+ break;
+ default:
+ assert(false);
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
}
- // unset the rightmost bit.
- payload_map &= (payload_map - 1);
}
+
+ if (record != nullptr) {
+ PinnableSlice ps;
+ ps.PinSelf(get_key);
+ record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
+ }
+
+ return Status::OK();
}
-void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
- assert(iter_payload != nullptr);
- Slice buf(trace->payload);
- GetFixed64(&buf, &trace->payload_map);
- int64_t payload_map = static_cast<int64_t>(trace->payload_map);
- while (payload_map) {
- // Find the rightmost set bit.
- uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
- switch (set_pos) {
- case TracePayloadType::kIterCFID:
- GetFixed32(&buf, &(iter_payload->cf_id));
- break;
- case TracePayloadType::kIterKey:
- GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key));
- break;
- case TracePayloadType::kIterLowerBound:
- GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound));
- break;
- case TracePayloadType::kIterUpperBound:
- GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound));
- break;
- default:
- assert(false);
+Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record) {
+ assert(trace != nullptr);
+ assert(trace->type == kTraceIteratorSeek ||
+ trace->type == kTraceIteratorSeekForPrev);
+
+ uint32_t cf_id = 0;
+ Slice iter_key;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
+ } else {
+ // Are these two used anywhere?
+ Slice lower_bound;
+ Slice upper_bound;
+
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kIterCFID:
+ GetFixed32(&buf, &cf_id);
+ break;
+ case TracePayloadType::kIterKey:
+ GetLengthPrefixedSlice(&buf, &iter_key);
+ break;
+ case TracePayloadType::kIterLowerBound:
+ GetLengthPrefixedSlice(&buf, &lower_bound);
+ break;
+ case TracePayloadType::kIterUpperBound:
+ GetLengthPrefixedSlice(&buf, &upper_bound);
+ break;
+ default:
+ assert(false);
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
}
- // unset the rightmost bit.
- payload_map &= (payload_map - 1);
}
+
+ if (record != nullptr) {
+ PinnableSlice ps_key;
+ ps_key.PinSelf(iter_key);
+ record->reset(new IteratorSeekQueryTraceRecord(
+ static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), cf_id,
+ std::move(ps_key), trace->ts));
+ }
+
+ return Status::OK();
}
-void TracerHelper::DecodeMultiGetPayload(Trace* trace,
- MultiGetPayload* multiget_payload) {
- assert(multiget_payload != nullptr);
+Status TracerHelper::DecodeMultiGetRecord(
+ Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record) {
+ assert(trace != nullptr);
+ assert(trace->type == kTraceMultiGet);
+ if (trace_file_version < 2) {
+ return Status::Corruption("MultiGet is not supported.");
+ }
+
+ uint32_t multiget_size = 0;
+ std::vector<uint32_t> cf_ids;
+ std::vector<PinnableSlice> multiget_keys;
+
Slice cfids_payload;
Slice keys_payload;
Slice buf(trace->payload);
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kMultiGetSize:
- GetFixed32(&buf, &(multiget_payload->multiget_size));
+ GetFixed32(&buf, &multiget_size);
break;
case TracePayloadType::kMultiGetCFIDs:
GetLengthPrefixedSlice(&buf, &cfids_payload);
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
+ if (multiget_size == 0) {
+ return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+ }
// Decode the cfids_payload and keys_payload
- multiget_payload->cf_ids.reserve(multiget_payload->multiget_size);
- multiget_payload->multiget_keys.reserve(multiget_payload->multiget_size);
- for (uint32_t i = 0; i < multiget_payload->multiget_size; i++) {
+ cf_ids.reserve(multiget_size);
+ multiget_keys.reserve(multiget_size);
+ for (uint32_t i = 0; i < multiget_size; i++) {
uint32_t tmp_cfid;
Slice tmp_key;
GetFixed32(&cfids_payload, &tmp_cfid);
GetLengthPrefixedSlice(&keys_payload, &tmp_key);
- multiget_payload->cf_ids.push_back(tmp_cfid);
- multiget_payload->multiget_keys.push_back(tmp_key.ToString());
+ cf_ids.push_back(tmp_cfid);
+ Slice s(tmp_key);
+ PinnableSlice ps;
+ ps.PinSelf(s);
+ multiget_keys.push_back(std::move(ps));
+ }
+
+ if (record != nullptr) {
+ record->reset(new MultiGetQueryTraceRecord(
+ std::move(cf_ids), std::move(multiget_keys), trace->ts));
}
+
+ return Status::OK();
}
Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
if (IsTraceFileOverMax()) {
return true;
}
- if ((trace_options_.filter & kTraceFilterGet
- && trace_type == kTraceGet)
- || (trace_options_.filter & kTraceFilterWrite
- && trace_type == kTraceWrite)) {
+ if ((trace_options_.filter & kTraceFilterGet && trace_type == kTraceGet) ||
+ (trace_options_.filter & kTraceFilterWrite &&
+ trace_type == kTraceWrite)) {
return true;
}
++trace_request_count_;
Status Tracer::Close() { return WriteFooter(); }
-Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
- std::unique_ptr<TraceReader>&& reader)
- : trace_reader_(std::move(reader)) {
- assert(db != nullptr);
- db_ = static_cast<DBImpl*>(db->GetRootDB());
- env_ = Env::Default();
- for (ColumnFamilyHandle* cfh : handles) {
- cf_map_[cfh->GetID()] = cfh;
- }
- fast_forward_ = 1;
-}
-
-Replayer::~Replayer() { trace_reader_.reset(); }
-
-Status Replayer::SetFastForward(uint32_t fast_forward) {
- Status s;
- if (fast_forward < 1) {
- s = Status::InvalidArgument("Wrong fast forward speed!");
- } else {
- fast_forward_ = fast_forward;
- s = Status::OK();
- }
- return s;
-}
-
-Status Replayer::Replay() {
- Status s;
- Trace header;
- int db_version;
- s = ReadHeader(&header);
- if (!s.ok()) {
- return s;
- }
- s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
- if (!s.ok()) {
- return s;
- }
-
- std::chrono::system_clock::time_point replay_epoch =
- std::chrono::system_clock::now();
- WriteOptions woptions;
- ReadOptions roptions;
- Trace trace;
- uint64_t ops = 0;
- Iterator* single_iter = nullptr;
- while (s.ok()) {
- trace.reset();
- s = ReadTrace(&trace);
- if (!s.ok()) {
- break;
- }
-
- std::this_thread::sleep_until(
- replay_epoch +
- std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
- if (trace.type == kTraceWrite) {
- if (trace_file_version_ < 2) {
- WriteBatch batch(trace.payload);
- db_->Write(woptions, &batch);
- } else {
- WritePayload w_payload;
- TracerHelper::DecodeWritePayload(&trace, &w_payload);
- WriteBatch batch(w_payload.write_batch_data.ToString());
- db_->Write(woptions, &batch);
- }
- ops++;
- } else if (trace.type == kTraceGet) {
- GetPayload get_payload;
- get_payload.cf_id = 0;
- get_payload.get_key = 0;
- if (trace_file_version_ < 2) {
- DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key);
- } else {
- TracerHelper::DecodeGetPayload(&trace, &get_payload);
- }
- if (get_payload.cf_id > 0 &&
- cf_map_.find(get_payload.cf_id) == cf_map_.end()) {
- return Status::Corruption("Invalid Column Family ID.");
- }
-
- std::string value;
- if (get_payload.cf_id == 0) {
- db_->Get(roptions, get_payload.get_key, &value);
- } else {
- db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key,
- &value);
- }
- ops++;
- } else if (trace.type == kTraceIteratorSeek) {
- // Currently, we only support to call Seek. The Next() and Prev() is not
- // supported.
- IterPayload iter_payload;
- iter_payload.cf_id = 0;
- if (trace_file_version_ < 2) {
- DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
- &iter_payload.iter_key);
- } else {
- TracerHelper::DecodeIterPayload(&trace, &iter_payload);
- }
- if (iter_payload.cf_id > 0 &&
- cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
- return Status::Corruption("Invalid Column Family ID.");
- }
-
- if (iter_payload.cf_id == 0) {
- single_iter = db_->NewIterator(roptions);
- } else {
- single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
- }
- single_iter->Seek(iter_payload.iter_key);
- ops++;
- delete single_iter;
- } else if (trace.type == kTraceIteratorSeekForPrev) {
- // Currently, we only support to call SeekForPrev. The Next() and Prev()
- // is not supported.
- IterPayload iter_payload;
- iter_payload.cf_id = 0;
- if (trace_file_version_ < 2) {
- DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
- &iter_payload.iter_key);
- } else {
- TracerHelper::DecodeIterPayload(&trace, &iter_payload);
- }
- if (iter_payload.cf_id > 0 &&
- cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
- return Status::Corruption("Invalid Column Family ID.");
- }
-
- if (iter_payload.cf_id == 0) {
- single_iter = db_->NewIterator(roptions);
- } else {
- single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
- }
- single_iter->SeekForPrev(iter_payload.iter_key);
- ops++;
- delete single_iter;
- } else if (trace.type == kTraceMultiGet) {
- MultiGetPayload multiget_payload;
- assert(trace_file_version_ >= 2);
- TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
- std::vector<ColumnFamilyHandle*> v_cfd;
- std::vector<Slice> keys;
- assert(multiget_payload.cf_ids.size() ==
- multiget_payload.multiget_keys.size());
- for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
- assert(i < multiget_payload.cf_ids.size() &&
- i < multiget_payload.multiget_keys.size());
- if (cf_map_.find(multiget_payload.cf_ids[i]) == cf_map_.end()) {
- return Status::Corruption("Invalid Column Family ID.");
- }
- v_cfd.push_back(cf_map_[multiget_payload.cf_ids[i]]);
- keys.push_back(Slice(multiget_payload.multiget_keys[i]));
- }
- std::vector<std::string> values;
- std::vector<Status> ss = db_->MultiGet(roptions, v_cfd, keys, &values);
- } else if (trace.type == kTraceEnd) {
- // Do nothing for now.
- // TODO: Add some validations later.
- break;
- }
- }
-
- if (s.IsIncomplete()) {
- // Reaching eof returns Incomplete status at the moment.
- // Could happen when killing a process without calling EndTrace() API.
- // TODO: Add better error handling.
- return Status::OK();
- }
- return s;
-}
-
-// The trace can be replayed with multithread by configurnge the number of
-// threads in the thread pool. Trace records are read from the trace file
-// sequentially and the corresponding queries are scheduled in the task
-// queue based on the timestamp. Currently, we support Write_batch (Put,
-// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
-Status Replayer::MultiThreadReplay(uint32_t threads_num) {
- Status s;
- Trace header;
- int db_version;
- s = ReadHeader(&header);
- if (!s.ok()) {
- return s;
- }
- s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
- if (!s.ok()) {
- return s;
- }
- ThreadPoolImpl thread_pool;
- thread_pool.SetHostEnv(env_);
-
- if (threads_num > 1) {
- thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
- } else {
- thread_pool.SetBackgroundThreads(1);
- }
-
- std::chrono::system_clock::time_point replay_epoch =
- std::chrono::system_clock::now();
- WriteOptions woptions;
- ReadOptions roptions;
- uint64_t ops = 0;
- while (s.ok()) {
- std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
- ra->db = db_;
- s = ReadTrace(&(ra->trace_entry));
- if (!s.ok()) {
- break;
- }
- ra->cf_map = &cf_map_;
- ra->woptions = woptions;
- ra->roptions = roptions;
- ra->trace_file_version = trace_file_version_;
-
- std::this_thread::sleep_until(
- replay_epoch + std::chrono::microseconds(
- (ra->trace_entry.ts - header.ts) / fast_forward_));
- if (ra->trace_entry.type == kTraceWrite) {
- thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
- nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceGet) {
- thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
- nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceIteratorSeek) {
- thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
- nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
- thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
- nullptr, nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceMultiGet) {
- thread_pool.Schedule(&Replayer::BGWorkMultiGet, ra.release(), nullptr,
- nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceEnd) {
- // Do nothing for now.
- // TODO: Add some validations later.
- break;
- } else {
- // Other trace entry types that are not implemented for replay.
- // To finish the replay, we continue the process.
- continue;
- }
- }
-
- if (s.IsIncomplete()) {
- // Reaching eof returns Incomplete status at the moment.
- // Could happen when killing a process without calling EndTrace() API.
- // TODO: Add better error handling.
- s = Status::OK();
- }
- thread_pool.JoinAllThreads();
- return s;
-}
-
-Status Replayer::ReadHeader(Trace* header) {
- assert(header != nullptr);
- std::string encoded_trace;
- // Read the trace head
- Status s = trace_reader_->Read(&encoded_trace);
- if (!s.ok()) {
- return s;
- }
-
- s = TracerHelper::DecodeTrace(encoded_trace, header);
-
- if (header->type != kTraceBegin) {
- return Status::Corruption("Corrupted trace file. Incorrect header.");
- }
- if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
- return Status::Corruption("Corrupted trace file. Incorrect magic.");
- }
-
- return s;
-}
-
-Status Replayer::ReadFooter(Trace* footer) {
- assert(footer != nullptr);
- Status s = ReadTrace(footer);
- if (!s.ok()) {
- return s;
- }
- if (footer->type != kTraceEnd) {
- return Status::Corruption("Corrupted trace file. Incorrect footer.");
- }
-
- // TODO: Add more validations later
- return s;
-}
-
-Status Replayer::ReadTrace(Trace* trace) {
- assert(trace != nullptr);
- std::string encoded_trace;
- Status s = trace_reader_->Read(&encoded_trace);
- if (!s.ok()) {
- return s;
- }
- return TracerHelper::DecodeTrace(encoded_trace, trace);
-}
-
-void Replayer::BGWorkGet(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
- ra->cf_map);
- GetPayload get_payload;
- get_payload.cf_id = 0;
- if (ra->trace_file_version < 2) {
- DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id,
- &get_payload.get_key);
- } else {
- TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload);
- }
- if (get_payload.cf_id > 0 &&
- cf_map->find(get_payload.cf_id) == cf_map->end()) {
- return;
- }
-
- std::string value;
- if (get_payload.cf_id == 0) {
- ra->db->Get(ra->roptions, get_payload.get_key, &value);
- } else {
- ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key,
- &value);
- }
- return;
-}
-
-void Replayer::BGWorkWriteBatch(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
-
- if (ra->trace_file_version < 2) {
- WriteBatch batch(ra->trace_entry.payload);
- ra->db->Write(ra->woptions, &batch);
- } else {
- WritePayload w_payload;
- TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload);
- WriteBatch batch(w_payload.write_batch_data.ToString());
- ra->db->Write(ra->woptions, &batch);
- }
- return;
-}
-
-void Replayer::BGWorkIterSeek(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
- ra->cf_map);
- IterPayload iter_payload;
- iter_payload.cf_id = 0;
-
- if (ra->trace_file_version < 2) {
- DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
- &iter_payload.iter_key);
- } else {
- TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
- }
- if (iter_payload.cf_id > 0 &&
- cf_map->find(iter_payload.cf_id) == cf_map->end()) {
- return;
- }
-
- Iterator* single_iter = nullptr;
- if (iter_payload.cf_id == 0) {
- single_iter = ra->db->NewIterator(ra->roptions);
- } else {
- single_iter =
- ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
- }
- single_iter->Seek(iter_payload.iter_key);
- delete single_iter;
- return;
-}
-
-void Replayer::BGWorkIterSeekForPrev(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
- ra->cf_map);
- IterPayload iter_payload;
- iter_payload.cf_id = 0;
-
- if (ra->trace_file_version < 2) {
- DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
- &iter_payload.iter_key);
- } else {
- TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
- }
- if (iter_payload.cf_id > 0 &&
- cf_map->find(iter_payload.cf_id) == cf_map->end()) {
- return;
- }
-
- Iterator* single_iter = nullptr;
- if (iter_payload.cf_id == 0) {
- single_iter = ra->db->NewIterator(ra->roptions);
- } else {
- single_iter =
- ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
- }
- single_iter->SeekForPrev(iter_payload.iter_key);
- delete single_iter;
- return;
-}
-
-void Replayer::BGWorkMultiGet(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
- ra->cf_map);
- MultiGetPayload multiget_payload;
- if (ra->trace_file_version < 2) {
- return;
- }
- TracerHelper::DecodeMultiGetPayload(&(ra->trace_entry), &multiget_payload);
- std::vector<ColumnFamilyHandle*> v_cfd;
- std::vector<Slice> keys;
- if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
- return;
- }
- for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
- if (cf_map->find(multiget_payload.cf_ids[i]) == cf_map->end()) {
- return;
- }
- v_cfd.push_back((*cf_map)[multiget_payload.cf_ids[i]]);
- keys.push_back(Slice(multiget_payload.multiget_keys[i]));
- }
- std::vector<std::string> values;
- std::vector<Status> ss = ra->db->MultiGet(ra->roptions, v_cfd, keys, &values);
- return;
-}
-
} // namespace ROCKSDB_NAMESPACE
#pragma once
+#include <atomic>
#include <memory>
+#include <mutex>
#include <unordered_map>
#include <utility>
#include "rocksdb/options.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/utilities/replayer.h"
namespace ROCKSDB_NAMESPACE {
static const int kTraceFileMajorVersion = 0;
static const int kTraceFileMinorVersion = 2;
-// Supported Trace types.
-enum TraceType : char {
- kTraceBegin = 1,
- kTraceEnd = 2,
- kTraceWrite = 3,
- kTraceGet = 4,
- kTraceIteratorSeek = 5,
- kTraceIteratorSeekForPrev = 6,
- // Block cache related types.
- kBlockTraceIndexBlock = 7,
- kBlockTraceFilterBlock = 8,
- kBlockTraceDataBlock = 9,
- kBlockTraceUncompressionDictBlock = 10,
- kBlockTraceRangeDeletionBlock = 11,
- // For IOTracing.
- kIOTracer = 12,
- // For query tracing
- kTraceMultiGet = 13,
- // All trace types should be added before kTraceMax
- kTraceMax,
-};
-
-// TODO: This should also be made part of public interface to help users build
-// custom TracerReaders and TraceWriters.
-//
// The data structure that defines a single trace.
struct Trace {
uint64_t ts; // timestamp
kMultiGetKeys = 10,
};
-struct WritePayload {
- Slice write_batch_data;
-};
-
-struct GetPayload {
- uint32_t cf_id = 0;
- Slice get_key;
-};
-
-struct IterPayload {
- uint32_t cf_id = 0;
- Slice iter_key;
- Slice lower_bound;
- Slice upper_bound;
-};
-
-struct MultiGetPayload {
- uint32_t multiget_size;
- std::vector<uint32_t> cf_ids;
- std::vector<std::string> multiget_keys;
-};
-
class TracerHelper {
public:
// Parse the string with major and minor version only
// Decode a string into the given trace object.
static Status DecodeTrace(const std::string& encoded_trace, Trace* trace);
+ // Decode a string into the given trace header.
+ static Status DecodeHeader(const std::string& encoded_trace, Trace* header);
+
// Set the payload map based on the payload type
static bool SetPayloadMap(uint64_t& payload_map,
const TracePayloadType payload_type);
// Decode the write payload and store in WrteiPayload
- static void DecodeWritePayload(Trace* trace, WritePayload* write_payload);
+ static Status DecodeWriteRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record);
// Decode the get payload and store in WrteiPayload
- static void DecodeGetPayload(Trace* trace, GetPayload* get_payload);
+ static Status DecodeGetRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record);
// Decode the iter payload and store in WrteiPayload
- static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload);
+ static Status DecodeIterRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record);
// Decode the multiget payload and store in MultiGetPayload
- static void DecodeMultiGetPayload(Trace* trace,
- MultiGetPayload* multiget_payload);
+ static Status DecodeMultiGetRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record);
};
// Tracer captures all RocksDB operations using a user-provided TraceWriter.
uint64_t trace_request_count_;
};
-// Replayer helps to replay the captured RocksDB operations, using a user
-// provided TraceReader.
-// The Replayer is instantiated via db_bench today, on using "replay" benchmark.
-class Replayer {
- public:
- Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
- std::unique_ptr<TraceReader>&& reader);
- ~Replayer();
-
- // Replay all the traces from the provided trace stream, taking the delay
- // between the traces into consideration.
- Status Replay();
-
- // Replay the provide trace stream, which is the same as Replay(), with
- // multi-threads. Queries are scheduled in the thread pool job queue.
- // User can set the number of threads in the thread pool.
- Status MultiThreadReplay(uint32_t threads_num);
-
- // Enables fast forwarding a replay by reducing the delay between the ingested
- // traces.
- // fast_forward : Rate of replay speedup.
- // If 1, replay the operations at the same rate as in the trace stream.
- // If > 1, speed up the replay by this amount.
- Status SetFastForward(uint32_t fast_forward);
-
- private:
- Status ReadHeader(Trace* header);
- Status ReadFooter(Trace* footer);
- Status ReadTrace(Trace* trace);
-
- // The background function for MultiThreadReplay to execute Get query
- // based on the trace records.
- static void BGWorkGet(void* arg);
-
- // The background function for MultiThreadReplay to execute WriteBatch
- // (Put, Delete, SingleDelete, DeleteRange) based on the trace records.
- static void BGWorkWriteBatch(void* arg);
-
- // The background function for MultiThreadReplay to execute Iterator (Seek)
- // based on the trace records.
- static void BGWorkIterSeek(void* arg);
-
- // The background function for MultiThreadReplay to execute Iterator
- // (SeekForPrev) based on the trace records.
- static void BGWorkIterSeekForPrev(void* arg);
-
- // The background function for MultiThreadReplay to execute MultiGet based on
- // the trace records
- static void BGWorkMultiGet(void* arg);
-
- DBImpl* db_;
- Env* env_;
- std::unique_ptr<TraceReader> trace_reader_;
- std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
- uint32_t fast_forward_;
- // When reading the trace header, the trace file version can be parsed.
- // Replayer will use different decode method to get the trace content based
- // on different trace file version.
- int trace_file_version_;
-};
-
-// The passin arg of MultiThreadRepkay for each trace record.
-struct ReplayerWorkerArg {
- DB* db;
- Trace trace_entry;
- std::unordered_map<uint32_t, ColumnFamilyHandle*>* cf_map;
- WriteOptions woptions;
- ReadOptions roptions;
- int trace_file_version;
-};
-
} // namespace ROCKSDB_NAMESPACE
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/simulator_cache/cache_simulator.h"
+
#include <algorithm>
+
#include "db/dbformat.h"
+#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE {
#include "utilities/simulator_cache/cache_simulator.h"
#include <cstdlib>
+
#include "rocksdb/env.h"
+#include "rocksdb/trace_record.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
return Status::OK();
}
+Status FileTraceReader::Reset() {
+ if (file_reader_ == nullptr) {
+ return Status::IOError("TraceReader is closed.");
+ }
+ offset_ = 0;
+ return Status::OK();
+}
+
Status FileTraceReader::Read(std::string* data) {
assert(file_reader_ != nullptr);
Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize,
virtual Status Read(std::string* data) override;
virtual Status Close() override;
+ virtual Status Reset() override;
private:
std::unique_ptr<RandomAccessFileReader> file_reader_;
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/trace/replayer_impl.h"
+
+#include <cmath>
+#include <thread>
+
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "util/threadpool_imp.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+ReplayerImpl::ReplayerImpl(DB* db,
+ const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader)
+ : Replayer(),
+ env_(db->GetEnv()),
+ trace_reader_(std::move(reader)),
+ prepared_(false),
+ trace_end_(false),
+ header_ts_(0),
+ exec_handler_(TraceRecord::NewExecutionHandler(db, handles)) {}
+
+ReplayerImpl::~ReplayerImpl() {
+ exec_handler_.reset();
+ trace_reader_.reset();
+}
+
+Status ReplayerImpl::Prepare() {
+ Trace header;
+ int db_version;
+ Status s = ReadHeader(&header);
+ if (!s.ok()) {
+ return s;
+ }
+ s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
+ if (!s.ok()) {
+ return s;
+ }
+ header_ts_ = header.ts;
+ prepared_ = true;
+ trace_end_ = false;
+ return Status::OK();
+}
+
+Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
+ if (!prepared_) {
+ return Status::Incomplete("Not prepared!");
+ }
+ if (trace_end_) {
+ return Status::Incomplete("Trace end.");
+ }
+
+ Trace trace;
+ Status s = ReadTrace(&trace); // ReadTrace is atomic
+ // Reached the trace end.
+ if (s.ok() && trace.type == kTraceEnd) {
+ trace_end_ = true;
+ return Status::Incomplete("Trace end.");
+ }
+ if (!s.ok() || record == nullptr) {
+ return s;
+ }
+
+ return DecodeTraceRecord(&trace, trace_file_version_, record);
+}
+
+Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record) {
+ return record->Accept(exec_handler_.get());
+}
+
+Status ReplayerImpl::Execute(std::unique_ptr<TraceRecord>&& record) {
+ Status s = record->Accept(exec_handler_.get());
+ record.reset();
+ return s;
+}
+
+Status ReplayerImpl::Replay(const ReplayOptions& options) {
+ if (options.fast_forward <= 0.0) {
+ return Status::InvalidArgument("Wrong fast forward speed!");
+ }
+
+ if (!prepared_) {
+ return Status::Incomplete("Not prepared!");
+ }
+ if (trace_end_) {
+ return Status::Incomplete("Trace end.");
+ }
+
+ Status s = Status::OK();
+
+ if (options.num_threads <= 1) {
+ // num_threads == 0 or num_threads == 1 uses single thread.
+ std::chrono::system_clock::time_point replay_epoch =
+ std::chrono::system_clock::now();
+
+ while (s.ok()) {
+ Trace trace;
+ s = ReadTrace(&trace);
+ // If already at trace end, ReadTrace should return Status::Incomplete().
+ if (!s.ok()) {
+ break;
+ }
+
+ // No need to sleep before breaking the loop if at the trace end.
+ if (trace.type == kTraceEnd) {
+ trace_end_ = true;
+ s = Status::Incomplete("Trace end.");
+ break;
+ }
+
+ // In single-threaded replay, decode first then sleep.
+ std::unique_ptr<TraceRecord> record;
+ s = DecodeTraceRecord(&trace, trace_file_version_, &record);
+ // Skip unsupported traces, stop for other errors.
+ if (s.IsNotSupported()) {
+ continue;
+ } else if (!s.ok()) {
+ break;
+ }
+
+ std::this_thread::sleep_until(
+ replay_epoch +
+ std::chrono::microseconds(static_cast<uint64_t>(std::llround(
+ 1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+
+ s = Execute(std::move(record));
+ }
+ } else {
+ // Multi-threaded replay.
+ ThreadPoolImpl thread_pool;
+ thread_pool.SetHostEnv(env_);
+ thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
+
+ std::mutex mtx;
+ // Background decoding and execution status.
+ Status bg_s = Status::OK();
+ uint64_t last_err_ts = static_cast<uint64_t>(-1);
+ // Callback function used in background work to update bg_s at the first
+ // execution error (with the smallest Trace timestamp).
+ auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
+ std::lock_guard<std::mutex> gd(mtx);
+ // Only record the first error.
+ if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
+ bg_s = err;
+ last_err_ts = err_ts;
+ }
+ };
+
+ std::chrono::system_clock::time_point replay_epoch =
+ std::chrono::system_clock::now();
+
+ while (bg_s.ok() && s.ok()) {
+ Trace trace;
+ s = ReadTrace(&trace);
+ // If already at trace end, ReadTrace should return Status::Incomplete().
+ if (!s.ok()) {
+ break;
+ }
+
+ TraceType trace_type = trace.type;
+
+ // No need to sleep before breaking the loop if at the trace end.
+ if (trace_type == kTraceEnd) {
+ trace_end_ = true;
+ s = Status::Incomplete("Trace end.");
+ break;
+ }
+
+ // In multi-threaded replay, sleep first thatn start decoding and
+ // execution in a thread.
+ std::this_thread::sleep_until(
+ replay_epoch +
+ std::chrono::microseconds(static_cast<uint64_t>(std::llround(
+ 1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+
+ if (trace_type == kTraceWrite || trace_type == kTraceGet ||
+ trace_type == kTraceIteratorSeek ||
+ trace_type == kTraceIteratorSeekForPrev ||
+ trace_type == kTraceMultiGet) {
+ std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
+ ra->trace_entry = std::move(trace);
+ ra->handler = exec_handler_.get();
+ ra->trace_file_version = trace_file_version_;
+ ra->error_cb = error_cb;
+ thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
+ nullptr, nullptr);
+ }
+ // Skip unsupported traces.
+ }
+
+ thread_pool.WaitForJobsAndJoinAllThreads();
+ if (!bg_s.ok()) {
+ s = bg_s;
+ }
+ }
+
+ if (s.IsIncomplete()) {
+ // Reaching eof returns Incomplete status at the moment.
+ // Could happen when killing a process without calling EndTrace() API.
+ // TODO: Add better error handling.
+ trace_end_ = true;
+ return Status::OK();
+ }
+ return s;
+}
+
+uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
+
+Status ReplayerImpl::ReadHeader(Trace* header) {
+ assert(header != nullptr);
+ Status s = trace_reader_->Reset();
+ if (!s.ok()) {
+ return s;
+ }
+ std::string encoded_trace;
+ // Read the trace head
+ s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+
+ return TracerHelper::DecodeHeader(encoded_trace, header);
+}
+
+Status ReplayerImpl::ReadFooter(Trace* footer) {
+ assert(footer != nullptr);
+ Status s = ReadTrace(footer);
+ if (!s.ok()) {
+ return s;
+ }
+ if (footer->type != kTraceEnd) {
+ return Status::Corruption("Corrupted trace file. Incorrect footer.");
+ }
+
+ // TODO: Add more validations later
+ return s;
+}
+
+Status ReplayerImpl::ReadTrace(Trace* trace) {
+ assert(trace != nullptr);
+ std::string encoded_trace;
+ // We don't know if TraceReader is implemented thread-safe, so we protect the
+ // reading trace part with a mutex. The decoding part does not need to be
+ // protected since it's local.
+ {
+ std::lock_guard<std::mutex> guard(mutex_);
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ return TracerHelper::DecodeTrace(encoded_trace, trace);
+}
+
+Status ReplayerImpl::DecodeTraceRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record) {
+ switch (trace->type) {
+ case kTraceWrite:
+ return TracerHelper::DecodeWriteRecord(trace, trace_file_version, record);
+ case kTraceGet:
+ return TracerHelper::DecodeGetRecord(trace, trace_file_version, record);
+ case kTraceIteratorSeek:
+ case kTraceIteratorSeekForPrev:
+ return TracerHelper::DecodeIterRecord(trace, trace_file_version, record);
+ case kTraceMultiGet:
+ return TracerHelper::DecodeMultiGetRecord(trace, trace_file_version,
+ record);
+ case kTraceEnd:
+ return Status::Incomplete("Trace end.");
+ default:
+ return Status::NotSupported("Unsupported trace type.");
+ }
+}
+
+void ReplayerImpl::BackgroundWork(void* arg) {
+ std::unique_ptr<ReplayerWorkerArg> ra(
+ reinterpret_cast<ReplayerWorkerArg*>(arg));
+ assert(ra != nullptr);
+
+ std::unique_ptr<TraceRecord> record;
+ Status s =
+ DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
+ if (s.ok()) {
+ s = record->Accept(ra->handler);
+ record.reset();
+ }
+ if (!s.ok() && ra->error_cb) {
+ ra->error_cb(s, ra->trace_entry.ts);
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/utilities/replayer.h"
+#include "trace_replay/trace_replay.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ColumnFamilyHandle;
+class DB;
+class Env;
+class TraceReader;
+class TraceRecord;
+class Status;
+
+struct ReplayOptions;
+
+class ReplayerImpl : public Replayer {
+ public:
+ ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader);
+ ~ReplayerImpl() override;
+
+ using Replayer::Prepare;
+ Status Prepare() override;
+
+ using Replayer::Next;
+ Status Next(std::unique_ptr<TraceRecord>* record) override;
+
+ using Replayer::Execute;
+ Status Execute(const std::unique_ptr<TraceRecord>& record) override;
+ Status Execute(std::unique_ptr<TraceRecord>&& record) override;
+
+ using Replayer::Replay;
+ Status Replay(const ReplayOptions& options) override;
+
+ using Replayer::GetHeaderTimestamp;
+ uint64_t GetHeaderTimestamp() const override;
+
+ private:
+ Status ReadHeader(Trace* header);
+ Status ReadFooter(Trace* footer);
+ Status ReadTrace(Trace* trace);
+
+ // Generic function to convert a Trace to TraceRecord.
+ static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record);
+
+ // Generic function to execute a Trace in a thread pool.
+ static void BackgroundWork(void* arg);
+
+ Env* env_;
+ std::unique_ptr<TraceReader> trace_reader_;
+ // When reading the trace header, the trace file version can be parsed.
+ // Replayer will use different decode method to get the trace content based
+ // on different trace file version.
+ int trace_file_version_;
+ std::mutex mutex_;
+ std::atomic<bool> prepared_;
+ std::atomic<bool> trace_end_;
+ uint64_t header_ts_;
+ std::unique_ptr<TraceRecord::Handler> exec_handler_;
+};
+
+// The passin arg of MultiThreadRepkay for each trace record.
+struct ReplayerWorkerArg {
+ Trace trace_entry;
+ int trace_file_version;
+ // Handler to execute TraceRecord.
+ TraceRecord::Handler* handler;
+ // Callback function to report the error status and the timestamp of the
+ // TraceRecord.
+ std::function<void(Status, uint64_t)> error_cb;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE