trace_replay/block_cache_tracer.cc
trace_replay/io_tracer.cc
trace_replay/trace_record_handler.cc
+ trace_replay/trace_record_result.cc
trace_replay/trace_record.cc
trace_replay/trace_replay.cc
util/coding.cc
* The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`.
## Public API change
-* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them.
+* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h, trace_record_result.h and utilities/replayer.h files to access the decoded Trace records, replay them, and query the actual operation results.
### Performance Improvements
* Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.
"trace_replay/io_tracer.cc",
"trace_replay/trace_record.cc",
"trace_replay/trace_record_handler.cc",
+ "trace_replay/trace_record_result.cc",
"trace_replay/trace_replay.cc",
"util/build_version.cc",
"util/coding.cc",
"trace_replay/io_tracer.cc",
"trace_replay/trace_record.cc",
"trace_replay/trace_record_handler.cc",
+ "trace_replay/trace_record_result.cc",
"trace_replay/trace_replay.cc",
"util/build_version.cc",
"util/coding.cc",
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
#include <atomic>
#include <cstdlib>
#include <functional>
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/trace_record_result.h"
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h"
#include "util/random.h"
ASSERT_EQ(0, env_->random_file_open_counter_.load());
}
+class TraceExecutionResultHandler : public TraceRecordResult::Handler {
+ public:
+ TraceExecutionResultHandler() {}
+ ~TraceExecutionResultHandler() override {}
+
+ virtual Status Handle(const StatusOnlyTraceExecutionResult& result) override {
+ if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+ return Status::InvalidArgument("Invalid timestamps.");
+ }
+ result.GetStatus().PermitUncheckedError();
+ switch (result.GetTraceType()) {
+ case kTraceWrite: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ writes_++;
+ break;
+ }
+ case kTraceIteratorSeek:
+ case kTraceIteratorSeekForPrev: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ seeks_++;
+ break;
+ }
+ default:
+ return Status::Corruption("Type mismatch.");
+ }
+ return Status::OK();
+ }
+
+ virtual Status Handle(
+ const SingleValueTraceExecutionResult& result) override {
+ if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+ return Status::InvalidArgument("Invalid timestamps.");
+ }
+ result.GetStatus().PermitUncheckedError();
+ switch (result.GetTraceType()) {
+ case kTraceGet: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ gets_++;
+ break;
+ }
+ default:
+ return Status::Corruption("Type mismatch.");
+ }
+ return Status::OK();
+ }
+
+ virtual Status Handle(
+ const MultiValuesTraceExecutionResult& result) override {
+ if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+ return Status::InvalidArgument("Invalid timestamps.");
+ }
+ for (const Status& s : result.GetMultiStatus()) {
+ s.PermitUncheckedError();
+ }
+ switch (result.GetTraceType()) {
+ case kTraceMultiGet: {
+ total_latency_ += result.GetLatency();
+ cnt_++;
+ multigets_++;
+ break;
+ }
+ default:
+ return Status::Corruption("Type mismatch.");
+ }
+ return Status::OK();
+ }
+
+ void Reset() {
+ total_latency_ = 0;
+ cnt_ = 0;
+ writes_ = 0;
+ gets_ = 0;
+ seeks_ = 0;
+ multigets_ = 0;
+ }
+
+ double GetAvgLatency() const {
+ return cnt_ == 0 ? 0.0 : 1.0 * total_latency_ / cnt_;
+ }
+
+ int GetNumWrites() const { return writes_; }
+
+ int GetNumGets() const { return gets_; }
+
+ int GetNumIterSeeks() const { return seeks_; }
+
+ int GetNumMultiGets() const { return multigets_; }
+
+ private:
+ std::atomic<uint64_t> total_latency_{0};
+ std::atomic<uint32_t> cnt_{0};
+ std::atomic<int> writes_{0};
+ std::atomic<int> gets_{0};
+ std::atomic<int> seeks_{0};
+ std::atomic<int> multigets_{0};
+};
+
TEST_F(DBTest2, TraceAndReplay) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+ // 5 Writes
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
+ // 6th Write
WriteBatch batch;
ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12"));
ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch));
+ // 2 Seek(ForPrev)s
single_iter = db_->NewIterator(ro);
- single_iter->Seek("f");
+ single_iter->Seek("f"); // Seek 1
single_iter->SeekForPrev("g");
ASSERT_OK(single_iter->status());
delete single_iter;
+ // 2 Gets
ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g"));
+ // 7th and 8th Write, 3rd Get
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+ // Total Write x 8, Get x 3, Seek x 2.
ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace.
ASSERT_OK(Put("hello", "world"));
std::unique_ptr<Replayer> replayer;
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+ TraceExecutionResultHandler res_handler;
+ std::function<void(Status, std::unique_ptr<TraceRecordResult> &&)> res_cb =
+ [&res_handler](Status exec_s, std::unique_ptr<TraceRecordResult>&& res) {
+ ASSERT_TRUE(exec_s.ok() || exec_s.IsNotSupported());
+ if (res != nullptr) {
+ ASSERT_OK(res->Accept(&res_handler));
+ res.reset();
+ }
+ };
+
// Unprepared replay should fail with Status::Incomplete()
- ASSERT_TRUE(replayer->Replay().IsIncomplete());
+ ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
ASSERT_OK(replayer->Prepare());
// Ok to repeatedly Prepare().
ASSERT_OK(replayer->Prepare());
// Replay using 1 thread, 1x speed.
- ASSERT_OK(replayer->Replay());
+ ASSERT_OK(replayer->Replay(ReplayOptions(1, 1.0), res_cb));
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 8);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value);
// Re-replay should fail with Status::Incomplete() if Prepare() was not
// called. Currently we don't distinguish between unprepared and trace end.
- ASSERT_TRUE(replayer->Replay().IsIncomplete());
+ ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
// Re-replay using 2 threads, 2x speed.
ASSERT_OK(replayer->Prepare());
- ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0)));
+ ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0), res_cb));
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 8);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
// Re-replay using 2 threads, 1/2 speed.
ASSERT_OK(replayer->Prepare());
- ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5)));
+ ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5), res_cb));
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 8);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
+
replayer.reset();
for (auto handle : handles) {
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+ // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2.
ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace.
ASSERT_OK(Put("hello", "world"));
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+ TraceExecutionResultHandler res_handler;
+
// Manual replay for 2 times. The 2nd checks if the replay can restart.
std::unique_ptr<TraceRecord> record;
+ std::unique_ptr<TraceRecordResult> result;
for (int i = 0; i < 2; i++) {
// Next should fail if unprepared.
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
continue;
}
if (s.ok()) {
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ if (result != nullptr) {
+ ASSERT_OK(result->Accept(&res_handler));
+ result.reset();
+ }
}
}
// Status::Incomplete() will be returned when manually reading the trace
// end, or Prepare() was not called.
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 8);
+ ASSERT_EQ(res_handler.GetNumGets(), 3);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
}
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_OK(batch.Put("trace-record-write1", "write1"));
ASSERT_OK(batch.Put("trace-record-write2", "write2"));
record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Write x 1
ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
ASSERT_EQ("write1", value);
ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
ASSERT_EQ("write2", value);
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 1);
+ ASSERT_EQ(res_handler.GetNumGets(), 0);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
// Get related
// Get an existing key.
record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
"trace-record-write1", fake_ts++));
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Get x 1
// Get an non-existing key, should still return Status::OK().
record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
fake_ts++));
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Get x 2
// Get from an invalid (non-existing) cf_id.
uint32_t invalid_cf_id = handles[1]->GetID() + 1;
record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
- ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+ ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+ ASSERT_TRUE(result == nullptr);
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 0);
+ ASSERT_EQ(res_handler.GetNumGets(), 2);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
// Iteration related
for (IteratorSeekQueryTraceRecord::SeekType seekType :
// Seek to an existing key.
record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Seek x 1 in one iteration
// Seek to an non-existing key, should still return Status::OK().
record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // Seek x 2 in one iteration
// Seek from an invalid cf_id.
record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
"whatever", fake_ts++));
- ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+ ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+ ASSERT_TRUE(result == nullptr);
}
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 0);
+ ASSERT_EQ(res_handler.GetNumGets(), 0);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 4); // Seek x 2 in two iterations
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+ res_handler.Reset();
// MultiGet related
// Get existing keys.
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "foo"}), fake_ts++));
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 1
// Get all non-existing keys, should still return Status::OK().
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"no1", "no2"}), fake_ts++));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 2
// Get mixed of existing and non-existing keys, should still return
// Status::OK().
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "no2"}), fake_ts++));
- ASSERT_OK(replayer->Execute(std::move(record)));
+ ASSERT_OK(replayer->Execute(record, &result));
+ ASSERT_TRUE(result != nullptr);
+ MultiValuesTraceExecutionResult* mvr =
+ dynamic_cast<MultiValuesTraceExecutionResult*>(result.get());
+ ASSERT_TRUE(mvr != nullptr);
+ ASSERT_OK(mvr->GetMultiStatus()[0]);
+ ASSERT_TRUE(mvr->GetMultiStatus()[1].IsNotFound());
+ ASSERT_EQ(mvr->GetValues()[0], "1");
+ ASSERT_EQ(mvr->GetValues()[1], "");
+ ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 3
// Get from an invalid (non-existing) cf_id.
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>(
{handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
- ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
+ ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+ ASSERT_TRUE(result == nullptr);
// Empty MultiGet
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
- ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+ ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+ ASSERT_TRUE(result == nullptr);
// MultiGet size mismatch
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a"}), fake_ts++));
- ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
+ ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+ ASSERT_TRUE(result == nullptr);
+ ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+ ASSERT_EQ(res_handler.GetNumWrites(), 0);
+ ASSERT_EQ(res_handler.GetNumGets(), 0);
+ ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+ ASSERT_EQ(res_handler.GetNumMultiGets(), 3);
+ res_handler.Reset();
replayer.reset();
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
- ASSERT_OK(replayer->Replay());
+ ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
- ASSERT_OK(replayer->Replay());
+ ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
- ASSERT_OK(replayer->Replay());
+ ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset();
// All the key-values should not present since we filter out the WRITE ops.
#pragma once
+#include <memory>
#include <string>
#include <vector>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
class ColumnFamilyHandle;
class DB;
-class Status;
// Supported trace record types.
enum TraceType : char {
kTraceMax,
};
-class WriteQueryTraceRecord;
class GetQueryTraceRecord;
class IteratorSeekQueryTraceRecord;
class MultiGetQueryTraceRecord;
+class TraceRecordResult;
+class WriteQueryTraceRecord;
// Base class for all types of trace records.
class TraceRecord {
public:
- TraceRecord();
explicit TraceRecord(uint64_t timestamp);
- virtual ~TraceRecord();
+ virtual ~TraceRecord() = default;
+
+ // Type of the trace record.
virtual TraceType GetTraceType() const = 0;
+ // Timestamp (in microseconds) of this trace.
virtual uint64_t GetTimestamp() const;
class Handler {
public:
- virtual ~Handler() {}
+ virtual ~Handler() = default;
+
+ // Handle WriteQueryTraceRecord
+ virtual Status Handle(const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) = 0;
+
+ // Handle GetQueryTraceRecord
+ virtual Status Handle(const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) = 0;
- virtual Status Handle(const WriteQueryTraceRecord& record) = 0;
- virtual Status Handle(const GetQueryTraceRecord& record) = 0;
- virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0;
- virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0;
+ // Handle IteratorSeekQueryTraceRecord
+ virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) = 0;
+
+ // Handle MultiGetQueryTraceRecord
+ virtual Status Handle(const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) = 0;
};
- virtual Status Accept(Handler* handler) = 0;
+ // Accept the handler and report the corresponding result in `result`.
+ virtual Status Accept(Handler* handler,
+ std::unique_ptr<TraceRecordResult>* result) = 0;
// Create a handler for the exeution of TraceRecord.
static Handler* NewExecutionHandler(
DB* db, const std::vector<ColumnFamilyHandle*>& handles);
private:
- // Timestamp (in microseconds) of this trace.
uint64_t timestamp_;
};
class QueryTraceRecord : public TraceRecord {
public:
explicit QueryTraceRecord(uint64_t timestamp);
-
- virtual ~QueryTraceRecord() override;
};
// Trace record for DB::Write() operation.
TraceType GetTraceType() const override { return kTraceWrite; }
+ // rep string for the WriteBatch.
virtual Slice GetWriteBatchRep() const;
- virtual Status Accept(Handler* handler) override;
+ Status Accept(Handler* handler,
+ std::unique_ptr<TraceRecordResult>* result) override;
private:
PinnableSlice rep_;
TraceType GetTraceType() const override { return kTraceGet; }
+ // Column family ID.
virtual uint32_t GetColumnFamilyID() const;
+ // Key to get.
virtual Slice GetKey() const;
- virtual Status Accept(Handler* handler) override;
+ Status Accept(Handler* handler,
+ std::unique_ptr<TraceRecordResult>* result) override;
private:
- // Column family ID.
uint32_t cf_id_;
- // Key to get.
PinnableSlice key_;
};
class IteratorQueryTraceRecord : public QueryTraceRecord {
public:
explicit IteratorQueryTraceRecord(uint64_t timestamp);
-
- virtual ~IteratorQueryTraceRecord() override;
};
// Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation.
virtual ~IteratorSeekQueryTraceRecord() override;
+ // Trace type matches the seek type.
TraceType GetTraceType() const override;
+ // Type of seek, Seek or SeekForPrev.
virtual SeekType GetSeekType() const;
+ // Column family ID.
virtual uint32_t GetColumnFamilyID() const;
+ // Key to seek to.
virtual Slice GetKey() const;
- virtual Status Accept(Handler* handler) override;
+ Status Accept(Handler* handler,
+ std::unique_ptr<TraceRecordResult>* result) override;
private:
SeekType type_;
- // Column family ID.
uint32_t cf_id_;
- // Key to seek to.
PinnableSlice key_;
};
TraceType GetTraceType() const override { return kTraceMultiGet; }
+ // Column familiy IDs.
virtual std::vector<uint32_t> GetColumnFamilyIDs() const;
+ // Keys to get.
virtual std::vector<Slice> GetKeys() const;
- virtual Status Accept(Handler* handler) override;
+ Status Accept(Handler* handler,
+ std::unique_ptr<TraceRecordResult>* result) override;
private:
- // Column familiy IDs.
std::vector<uint32_t> cf_ids_;
- // Keys to get.
std::vector<PinnableSlice> keys_;
};
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class MultiValuesTraceExecutionResult;
+class SingleValueTraceExecutionResult;
+class StatusOnlyTraceExecutionResult;
+
+// Base class for the results of all types of trace records.
+// Theses classes can be used to report the execution result of
+// TraceRecord::Handler::Handle() or TraceRecord::Accept().
+class TraceRecordResult {
+ public:
+ explicit TraceRecordResult(TraceType trace_type);
+
+ virtual ~TraceRecordResult() = default;
+
+ // Trace type of the corresponding TraceRecord.
+ virtual TraceType GetTraceType() const;
+
+ class Handler {
+ public:
+ virtual ~Handler() = default;
+
+ // Handle StatusOnlyTraceExecutionResult
+ virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0;
+
+ // Handle SingleValueTraceExecutionResult
+ virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0;
+
+ // Handle MultiValuesTraceExecutionResult
+ virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0;
+ };
+
+ /*
+ * Example handler to just print the trace record execution results.
+ *
+ * class ResultPrintHandler : public TraceRecordResult::Handler {
+ * public:
+ * ResultPrintHandler();
+ * ~ResultPrintHandler() override {}
+ *
+ * Status Handle(const StatusOnlyTraceExecutionResult& result) override {
+ * std::cout << "Status: " << result.GetStatus().ToString() << std::endl;
+ * }
+ *
+ * Status Handle(const SingleValueTraceExecutionResult& result) override {
+ * std::cout << "Status: " << result.GetStatus().ToString()
+ * << ", value: " << result.GetValue() << std::endl;
+ * }
+ *
+ * Status Handle(const MultiValuesTraceExecutionResult& result) override {
+ * size_t size = result.GetMultiStatus().size();
+ * for (size_t i = 0; i < size; i++) {
+ * std::cout << "Status: " << result.GetMultiStatus()[i].ToString()
+ * << ", value: " << result.GetValues()[i] << std::endl;
+ * }
+ * }
+ * };
+ * */
+
+ // Accept the handler.
+ virtual Status Accept(Handler* handler) = 0;
+
+ private:
+ TraceType trace_type_;
+};
+
+// Base class for the results from the trace record execution handler (created
+// by TraceRecord::NewExecutionHandler()).
+//
+// The actual execution status or returned values may be hidden from
+// TraceRecord::Handler::Handle and TraceRecord::Accept. For example, a
+// GetQueryTraceRecord's execution calls DB::Get() internally. DB::Get() may
+// return Status::NotFound() but TraceRecord::Handler::Handle() or
+// TraceRecord::Accept() will still return Status::OK(). The actual status from
+// DB::Get() and the returned value string may be saved in a
+// SingleValueTraceExecutionResult.
+class TraceExecutionResult : public TraceRecordResult {
+ public:
+ TraceExecutionResult(uint64_t start_timestamp, uint64_t end_timestamp,
+ TraceType trace_type);
+
+ // Execution start/end timestamps and request latency in microseconds.
+ virtual uint64_t GetStartTimestamp() const;
+ virtual uint64_t GetEndTimestamp() const;
+ inline uint64_t GetLatency() const {
+ return GetEndTimestamp() - GetStartTimestamp();
+ }
+
+ private:
+ uint64_t ts_start_;
+ uint64_t ts_end_;
+};
+
+// Result for operations that only return a single Status.
+// Example operations: DB::Write(), Iterator::Seek() and
+// Iterator::SeekForPrev().
+class StatusOnlyTraceExecutionResult : public TraceExecutionResult {
+ public:
+ StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type);
+
+ virtual ~StatusOnlyTraceExecutionResult() override = default;
+
+ // Return value of DB::Write(), etc.
+ virtual const Status& GetStatus() const;
+
+ virtual Status Accept(Handler* handler) override;
+
+ private:
+ Status status_;
+};
+
+// Result for operations that return a Status and a value.
+// Example operation: DB::Get()
+class SingleValueTraceExecutionResult : public TraceExecutionResult {
+ public:
+ SingleValueTraceExecutionResult(Status status, const std::string& value,
+ uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type);
+
+ SingleValueTraceExecutionResult(Status status, std::string&& value,
+ uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type);
+
+ virtual ~SingleValueTraceExecutionResult() override;
+
+ // Return status of DB::Get(), etc.
+ virtual const Status& GetStatus() const;
+
+ // Value for the searched key.
+ virtual const std::string& GetValue() const;
+
+ virtual Status Accept(Handler* handler) override;
+
+ private:
+ Status status_;
+ std::string value_;
+};
+
+// Result for operations that return multiple Status(es) and values.
+// Example operation: DB::MultiGet()
+class MultiValuesTraceExecutionResult : public TraceExecutionResult {
+ public:
+ MultiValuesTraceExecutionResult(std::vector<Status> multi_status,
+ std::vector<std::string> values,
+ uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type);
+
+ virtual ~MultiValuesTraceExecutionResult() override;
+
+ // Returned Status(es) of DB::MultiGet(), etc.
+ virtual const std::vector<Status>& GetMultiStatus() const;
+
+ // Returned values for the searched keys.
+ virtual const std::vector<std::string>& GetValues() const;
+
+ virtual Status Accept(Handler* handler) override;
+
+ private:
+ std::vector<Status> multi_status_;
+ std::vector<std::string> values_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
#pragma once
#ifndef ROCKSDB_LITE
+#include <functional>
#include <memory>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
-#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE {
+class TraceRecord;
+class TraceRecordResult;
+
struct ReplayOptions {
// Number of threads used for replaying. If 0 or 1, replay using
// single thread.
double fast_forward;
ReplayOptions() : num_threads(1), fast_forward(1.0) {}
+
ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio)
: num_threads(num_of_threads), fast_forward(fast_forward_ratio) {}
};
// instantiated via db_bench today, on using "replay" benchmark.
class Replayer {
public:
- virtual ~Replayer() {}
+ virtual ~Replayer() = default;
// Make some preparation before replaying the trace. This will also reset the
// replayer in order to restart replaying.
// trace;
// Status::NotSupported() if the operation is not supported;
// Otherwise, return the corresponding error status.
- virtual Status Execute(const std::unique_ptr<TraceRecord>& record) = 0;
- virtual Status Execute(std::unique_ptr<TraceRecord>&& record) = 0;
+ //
+ // The actual operation execution status and result(s) will be saved in
+ // result. For example, a GetQueryTraceRecord will have its DB::Get() status
+ // and the returned value saved in a SingleValueTraceExecutionResult.
+ virtual Status Execute(const std::unique_ptr<TraceRecord>& record,
+ std::unique_ptr<TraceRecordResult>* result) = 0;
// Replay all the traces from the provided trace stream, taking the delay
// between the traces into consideration.
- virtual Status Replay(const ReplayOptions& options) = 0;
- virtual Status Replay() { return Replay(ReplayOptions()); }
+ //
+ // result_callback reports the status of executing a trace record, and the
+ // actual operation execution result (See the description for Execute()).
+ virtual Status Replay(
+ const ReplayOptions& options,
+ const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+ result_callback) = 0;
};
} // namespace ROCKSDB_NAMESPACE
test_util/transaction_test_util.cc \
tools/dump/db_dump_tool.cc \
trace_replay/trace_record_handler.cc \
+ trace_replay/trace_record_result.cc \
trace_replay/trace_record.cc \
trace_replay/trace_replay.cc \
trace_replay/block_cache_tracer.cc \
}
s = replayer->Replay(
ReplayOptions(static_cast<uint32_t>(FLAGS_trace_replay_threads),
- FLAGS_trace_replay_fast_forward));
+ FLAGS_trace_replay_fast_forward),
+ nullptr);
replayer.reset();
if (s.ok()) {
fprintf(stdout, "Replay completed from trace_file: %s\n",
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
+#include "rocksdb/trace_record_result.h"
#include "trace_replay/trace_record_handler.h"
namespace ROCKSDB_NAMESPACE {
// TraceRecord
TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {}
-TraceRecord::~TraceRecord() {}
-
uint64_t TraceRecord::GetTimestamp() const { return timestamp_; }
TraceRecord::Handler* TraceRecord::NewExecutionHandler(
QueryTraceRecord::QueryTraceRecord(uint64_t timestamp)
: TraceRecord(timestamp) {}
-QueryTraceRecord::~QueryTraceRecord() {}
-
// WriteQueryTraceRecord
WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep,
uint64_t timestamp)
rep_.PinSelf(write_batch_rep);
}
-WriteQueryTraceRecord::~WriteQueryTraceRecord() {}
+WriteQueryTraceRecord::~WriteQueryTraceRecord() { rep_.clear(); }
Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); }
-Status WriteQueryTraceRecord::Accept(Handler* handler) {
+Status WriteQueryTraceRecord::Accept(
+ Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr);
- return handler->Handle(*this);
+ return handler->Handle(*this, result);
}
// GetQueryTraceRecord
key_.PinSelf(key);
}
-GetQueryTraceRecord::~GetQueryTraceRecord() {}
+GetQueryTraceRecord::~GetQueryTraceRecord() { key_.clear(); }
uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; }
Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); }
-Status GetQueryTraceRecord::Accept(Handler* handler) {
+Status GetQueryTraceRecord::Accept(Handler* handler,
+ std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr);
- return handler->Handle(*this);
+ return handler->Handle(*this, result);
}
// IteratorQueryTraceRecord
IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
: QueryTraceRecord(timestamp) {}
-IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
-
// IteratorSeekQueryTraceRecord
IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
key_.PinSelf(key);
}
-IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {}
+IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); }
TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {
return static_cast<TraceType>(type_);
Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); }
-Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) {
+Status IteratorSeekQueryTraceRecord::Accept(
+ Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr);
- return handler->Handle(*this);
+ return handler->Handle(*this, result);
}
// MultiGetQueryTraceRecord
}
}
-MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {}
+MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {
+ cf_ids_.clear();
+ keys_.clear();
+}
std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const {
return cf_ids_;
return std::vector<Slice>(keys_.begin(), keys_.end());
}
-Status MultiGetQueryTraceRecord::Accept(Handler* handler) {
+Status MultiGetQueryTraceRecord::Accept(
+ Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr);
- return handler->Handle(*this);
+ return handler->Handle(*this, result);
}
} // namespace ROCKSDB_NAMESPACE
#include "trace_replay/trace_record_handler.h"
#include "rocksdb/iterator.h"
+#include "rocksdb/trace_record_result.h"
#include "rocksdb/write_batch.h"
namespace ROCKSDB_NAMESPACE {
: TraceRecord::Handler(),
db_(db),
write_opts_(WriteOptions()),
- read_opts_(ReadOptions()) {
+ read_opts_(ReadOptions()),
+ clock_(SystemClock::Default()) {
assert(db != nullptr);
assert(!handles.empty());
cf_map_.reserve(handles.size());
TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
-Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) {
+Status TraceExecutionHandler::Handle(
+ const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
+ uint64_t start = clock_->NowMicros();
+
WriteBatch batch(record.GetWriteBatchRep().ToString());
- return db_->Write(write_opts_, &batch);
+ Status s = db_->Write(write_opts_, &batch);
+
+ uint64_t end = clock_->NowMicros();
+
+ if (s.ok() && result != nullptr) {
+ result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
+ record.GetTraceType()));
+ }
+
+ return s;
}
-Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) {
+Status TraceExecutionHandler::Handle(
+ const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
auto it = cf_map_.find(record.GetColumnFamilyID());
if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
+ uint64_t start = clock_->NowMicros();
+
std::string value;
Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
- // Treat not found as ok and return other errors.
- return s.IsNotFound() ? Status::OK() : s;
+ uint64_t end = clock_->NowMicros();
+
+ // Treat not found as ok, return other errors.
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (result != nullptr) {
+ // Report the actual opetation status in TraceExecutionResult
+ result->reset(new SingleValueTraceExecutionResult(
+ std::move(s), std::move(value), start, end, record.GetTraceType()));
+ }
+ return Status::OK();
}
Status TraceExecutionHandler::Handle(
- const IteratorSeekQueryTraceRecord& record) {
+ const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
auto it = cf_map_.find(record.GetColumnFamilyID());
if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
Iterator* single_iter = db_->NewIterator(read_opts_, it->second);
+ uint64_t start = clock_->NowMicros();
+
switch (record.GetSeekType()) {
case IteratorSeekQueryTraceRecord::kSeekForPrev: {
single_iter->SeekForPrev(record.GetKey());
break;
}
}
+
+ uint64_t end = clock_->NowMicros();
+
Status s = single_iter->status();
delete single_iter;
+
+ if (s.ok() && result != nullptr) {
+ result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
+ record.GetTraceType()));
+ }
+
return s;
}
-Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
+Status TraceExecutionHandler::Handle(
+ const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
std::vector<ColumnFamilyHandle*> handles;
handles.reserve(record.GetColumnFamilyIDs().size());
for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
}
+ uint64_t start = clock_->NowMicros();
+
std::vector<std::string> values;
std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
+ uint64_t end = clock_->NowMicros();
+
// Treat not found as ok, return other errors.
- for (Status s : ss) {
+ for (const Status& s : ss) {
if (!s.ok() && !s.IsNotFound()) {
return s;
}
}
+
+ if (result != nullptr) {
+ // Report the actual opetation status in TraceExecutionResult
+ result->reset(new MultiValuesTraceExecutionResult(
+ std::move(ss), std::move(values), start, end, record.GetTraceType()));
+ }
+
return Status::OK();
}
#pragma once
+#include <memory>
#include <unordered_map>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
+#include "rocksdb/system_clock.h"
#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE {
const std::vector<ColumnFamilyHandle*>& handles);
virtual ~TraceExecutionHandler() override;
- virtual Status Handle(const WriteQueryTraceRecord& record) override;
- virtual Status Handle(const GetQueryTraceRecord& record) override;
- virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override;
- virtual Status Handle(const MultiGetQueryTraceRecord& record) override;
+ virtual Status Handle(const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ virtual Status Handle(const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ virtual Status Handle(const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
private:
DB* db_;
std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
WriteOptions write_opts_;
ReadOptions read_opts_;
+ std::shared_ptr<SystemClock> clock_;
};
// To do: Handler for trace_analyzer.
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/trace_record_result.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceRecordResult
+TraceRecordResult::TraceRecordResult(TraceType trace_type)
+ : trace_type_(trace_type) {}
+
+TraceType TraceRecordResult::GetTraceType() const { return trace_type_; }
+
+// TraceExecutionResult
+TraceExecutionResult::TraceExecutionResult(uint64_t start_timestamp,
+ uint64_t end_timestamp,
+ TraceType trace_type)
+ : TraceRecordResult(trace_type),
+ ts_start_(start_timestamp),
+ ts_end_(end_timestamp) {
+ assert(ts_start_ <= ts_end_);
+}
+
+uint64_t TraceExecutionResult::GetStartTimestamp() const { return ts_start_; }
+
+uint64_t TraceExecutionResult::GetEndTimestamp() const { return ts_end_; }
+
+// StatusOnlyTraceExecutionResult
+StatusOnlyTraceExecutionResult::StatusOnlyTraceExecutionResult(
+ Status status, uint64_t start_timestamp, uint64_t end_timestamp,
+ TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ status_(std::move(status)) {}
+
+const Status& StatusOnlyTraceExecutionResult::GetStatus() const {
+ return status_;
+}
+
+Status StatusOnlyTraceExecutionResult::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// SingleValueTraceExecutionResult
+SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
+ Status status, const std::string& value, uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ status_(std::move(status)),
+ value_(value) {}
+
+SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
+ Status status, std::string&& value, uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ status_(std::move(status)),
+ value_(std::move(value)) {}
+
+SingleValueTraceExecutionResult::~SingleValueTraceExecutionResult() {
+ value_.clear();
+}
+
+const Status& SingleValueTraceExecutionResult::GetStatus() const {
+ return status_;
+}
+
+const std::string& SingleValueTraceExecutionResult::GetValue() const {
+ return value_;
+}
+
+Status SingleValueTraceExecutionResult::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// MultiValuesTraceExecutionResult
+MultiValuesTraceExecutionResult::MultiValuesTraceExecutionResult(
+ std::vector<Status> multi_status, std::vector<std::string> values,
+ uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ multi_status_(std::move(multi_status)),
+ values_(std::move(values)) {}
+
+MultiValuesTraceExecutionResult::~MultiValuesTraceExecutionResult() {
+ multi_status_.clear();
+ values_.clear();
+}
+
+const std::vector<Status>& MultiValuesTraceExecutionResult::GetMultiStatus()
+ const {
+ return multi_status_;
+}
+
+const std::vector<std::string>& MultiValuesTraceExecutionResult::GetValues()
+ const {
+ return values_;
+}
+
+Status MultiValuesTraceExecutionResult::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+} // namespace ROCKSDB_NAMESPACE
assert(trace != nullptr);
assert(trace->type == kTraceWrite);
+ if (record != nullptr) {
+ record->reset(nullptr);
+ }
+
PinnableSlice rep;
if (trace_file_version < 2) {
rep.PinSelf(trace->payload);
assert(trace != nullptr);
assert(trace->type == kTraceGet);
+ if (record != nullptr) {
+ record->reset(nullptr);
+ }
+
uint32_t cf_id = 0;
Slice get_key;
assert(trace->type == kTraceIteratorSeek ||
trace->type == kTraceIteratorSeekForPrev);
+ if (record != nullptr) {
+ record->reset(nullptr);
+ }
+
uint32_t cf_id = 0;
Slice iter_key;
std::unique_ptr<TraceRecord>* record) {
assert(trace != nullptr);
assert(trace->type == kTraceMultiGet);
+
+ if (record != nullptr) {
+ record->reset(nullptr);
+ }
+
if (trace_file_version < 2) {
return Status::Corruption("MultiGet is not supported.");
}
#include <cmath>
#include <thread>
-#include "rocksdb/db.h"
-#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
-#include "rocksdb/status.h"
#include "rocksdb/system_clock.h"
-#include "rocksdb/trace_reader_writer.h"
#include "util/threadpool_imp.h"
namespace ROCKSDB_NAMESPACE {
return DecodeTraceRecord(&trace, trace_file_version_, record);
}
-Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record) {
- return record->Accept(exec_handler_.get());
+Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ return record->Accept(exec_handler_.get(), result);
}
-Status ReplayerImpl::Execute(std::unique_ptr<TraceRecord>&& record) {
- Status s = record->Accept(exec_handler_.get());
- record.reset();
- return s;
-}
-
-Status ReplayerImpl::Replay(const ReplayOptions& options) {
+Status ReplayerImpl::Replay(
+ const ReplayOptions& options,
+ const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+ result_callback) {
if (options.fast_forward <= 0.0) {
return Status::InvalidArgument("Wrong fast forward speed!");
}
// In single-threaded replay, decode first then sleep.
std::unique_ptr<TraceRecord> record;
s = DecodeTraceRecord(&trace, trace_file_version_, &record);
- // Skip unsupported traces, stop for other errors.
- if (s.IsNotSupported()) {
- continue;
- } else if (!s.ok()) {
+ if (!s.ok() && !s.IsNotSupported()) {
break;
}
- std::this_thread::sleep_until(
+ std::chrono::system_clock::time_point sleep_to =
replay_epoch +
std::chrono::microseconds(static_cast<uint64_t>(std::llround(
- 1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+ 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
+ if (sleep_to > std::chrono::system_clock::now()) {
+ std::this_thread::sleep_until(sleep_to);
+ }
- s = Execute(std::move(record));
+ // Skip unsupported traces, stop for other errors.
+ if (s.IsNotSupported()) {
+ if (result_callback != nullptr) {
+ result_callback(s, nullptr);
+ }
+ s = Status::OK();
+ continue;
+ }
+
+ if (result_callback == nullptr) {
+ s = Execute(record, nullptr);
+ } else {
+ std::unique_ptr<TraceRecordResult> res;
+ s = Execute(record, &res);
+ result_callback(s, std::move(res));
+ }
}
} else {
// Multi-threaded replay.
// In multi-threaded replay, sleep first thatn start decoding and
// execution in a thread.
- std::this_thread::sleep_until(
+ std::chrono::system_clock::time_point sleep_to =
replay_epoch +
std::chrono::microseconds(static_cast<uint64_t>(std::llround(
- 1.0 * (trace.ts - header_ts_) / options.fast_forward))));
+ 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
+ if (sleep_to > std::chrono::system_clock::now()) {
+ std::this_thread::sleep_until(sleep_to);
+ }
if (trace_type == kTraceWrite || trace_type == kTraceGet ||
trace_type == kTraceIteratorSeek ||
ra->handler = exec_handler_.get();
ra->trace_file_version = trace_file_version_;
ra->error_cb = error_cb;
+ ra->result_cb = result_callback;
thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
nullptr, nullptr);
+ } else {
+ // Skip unsupported traces.
+ if (result_callback != nullptr) {
+ result_callback(Status::NotSupported("Unsupported trace type."),
+ nullptr);
+ }
}
- // Skip unsupported traces.
}
thread_pool.WaitForJobsAndJoinAllThreads();
std::unique_ptr<TraceRecord> record;
Status s =
DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
- if (s.ok()) {
- s = record->Accept(ra->handler);
- record.reset();
+ if (!s.ok()) {
+ // Stop the replay
+ if (ra->error_cb != nullptr) {
+ ra->error_cb(s, ra->trace_entry.ts);
+ }
+ // Report the result
+ if (ra->result_cb != nullptr) {
+ ra->result_cb(s, nullptr);
+ }
+ return;
}
- if (!s.ok() && ra->error_cb) {
- ra->error_cb(s, ra->trace_entry.ts);
+
+ if (ra->result_cb == nullptr) {
+ s = record->Accept(ra->handler, nullptr);
+ } else {
+ std::unique_ptr<TraceRecordResult> res;
+ s = record->Accept(ra->handler, &res);
+ ra->result_cb(s, std::move(res));
}
+ record.reset();
}
} // namespace ROCKSDB_NAMESPACE
#include <mutex>
#include <unordered_map>
-#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h"
+#include "rocksdb/trace_record_result.h"
#include "rocksdb/utilities/replayer.h"
#include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE {
-class ColumnFamilyHandle;
-class DB;
-class Env;
-class TraceReader;
-class TraceRecord;
-class Status;
-
-struct ReplayOptions;
-
class ReplayerImpl : public Replayer {
public:
ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
Status Next(std::unique_ptr<TraceRecord>* record) override;
using Replayer::Execute;
- Status Execute(const std::unique_ptr<TraceRecord>& record) override;
- Status Execute(std::unique_ptr<TraceRecord>&& record) override;
+ Status Execute(const std::unique_ptr<TraceRecord>& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
using Replayer::Replay;
- Status Replay(const ReplayOptions& options) override;
+ Status Replay(
+ const ReplayOptions& options,
+ const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+ result_callback) override;
using Replayer::GetHeaderTimestamp;
uint64_t GetHeaderTimestamp() const override;
// Callback function to report the error status and the timestamp of the
// TraceRecord.
std::function<void(Status, uint64_t)> error_cb;
+ // Callback function to report the trace execution status and operation
+ // execution status/result(s).
+ std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb;
};
} // namespace ROCKSDB_NAMESPACE