DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
DEFINE_bool(analyze_iterator, false,
- " Analyze the iterate query like seek() and seekForPrev().");
+ " Analyze the iterate query like Seek() and SeekForPrev().");
DEFINE_bool(analyze_multiget, false,
" Analyze the MultiGet query. NOTE: for"
" MultiGet, we analyze each KV-pair read in one MultiGet query. "
total_access_keys_ = 0;
total_gets_ = 0;
total_writes_ = 0;
+ total_seeks_ = 0;
+ total_seek_prevs_ = 0;
+ total_multigets_ = 0;
trace_create_time_ = 0;
begin_time_ = 0;
end_time_ = 0;
time_series_start_ = 0;
cur_time_sec_ = 0;
- // Set the default trace file version as version 0.2
- trace_file_version_ = 2;
if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
sample_max_ = 1;
} else {
Status TraceAnalyzer::PrepareProcessing() {
Status s;
// Prepare the trace reader
- s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
+ if (trace_reader_ == nullptr) {
+ s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
+ } else {
+ s = trace_reader_->Reset();
+ }
if (!s.ok()) {
return s;
}
fprintf(stderr, "Cannot read the header\n");
return s;
}
- s = TracerHelper::ParseTraceHeader(header, &trace_file_version_,
- &db_version_);
+ // Set the default trace file version as version 0.2
+ int trace_file_version = 2;
+ s = TracerHelper::ParseTraceHeader(header, &trace_file_version, &db_version_);
if (!s.ok()) {
return s;
}
break;
}
- total_requests_++;
end_time_ = trace.ts;
if (trace.type == kTraceEnd) {
break;
}
+ // Do not count TraceEnd (if there is one)
+ total_requests_++;
std::unique_ptr<TraceRecord> record;
- switch (trace.type) {
- case kTraceWrite: {
- s = TracerHelper::DecodeWriteRecord(&trace, trace_file_version_,
- &record);
- if (!s.ok()) {
- return s;
- }
- total_writes_++;
- c_time_ = trace.ts;
- std::unique_ptr<WriteQueryTraceRecord> r(
- reinterpret_cast<WriteQueryTraceRecord*>(record.release()));
- // Note that, if the write happens in a transaction,
- // 'Write' will be called twice, one for Prepare, one for
- // Commit. Thus, in the trace, for the same WriteBatch, there
- // will be two reords if it is in a transaction. Here, we only
- // process the reord that is committed. If write is non-transaction,
- // HasBeginPrepare()==false, so we process it normally.
- WriteBatch batch(r->GetWriteBatchRep().ToString());
- if (batch.HasBeginPrepare() && !batch.HasCommit()) {
- continue;
- }
- TraceWriteHandler write_handler(this);
- s = batch.Iterate(&write_handler);
- if (!s.ok()) {
- fprintf(stderr, "Cannot process the write batch in the trace\n");
- return s;
- }
- break;
- }
- case kTraceGet: {
- s = TracerHelper::DecodeGetRecord(&trace, trace_file_version_, &record);
- if (!s.ok()) {
- return s;
- }
- total_gets_++;
- std::unique_ptr<GetQueryTraceRecord> r(
- reinterpret_cast<GetQueryTraceRecord*>(record.release()));
- s = HandleGet(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
- 1);
- if (!s.ok()) {
- fprintf(stderr, "Cannot process the get in the trace\n");
- return s;
- }
- break;
- }
- case kTraceIteratorSeek:
- case kTraceIteratorSeekForPrev: {
- s = TracerHelper::DecodeIterRecord(&trace, trace_file_version_,
- &record);
- if (!s.ok()) {
- return s;
- }
- std::unique_ptr<IteratorSeekQueryTraceRecord> r(
- reinterpret_cast<IteratorSeekQueryTraceRecord*>(record.release()));
- s = HandleIter(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
- r->GetTraceType());
- if (!s.ok()) {
- fprintf(stderr, "Cannot process the iterator in the trace\n");
- return s;
- }
- break;
- }
- case kTraceMultiGet: {
- s = TracerHelper::DecodeMultiGetRecord(&trace, trace_file_version_,
- &record);
- if (!s.ok()) {
- return s;
- }
- std::unique_ptr<MultiGetQueryTraceRecord> r(
- reinterpret_cast<MultiGetQueryTraceRecord*>(record.release()));
- s = HandleMultiGet(r->GetColumnFamilyIDs(), r->GetKeys(),
- r->GetTimestamp());
- break;
- }
- default: {
- // Skip unsupported types
- break;
- }
+ s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version, &record);
+ if (s.IsNotSupported()) {
+ continue;
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ s = record->Accept(this, nullptr);
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot process the TraceRecord\n");
+ return s;
}
}
if (s.IsIncomplete()) {
// Fix it: Reaching eof returns Incomplete status at the moment.
- //
return Status::OK();
}
return s;
return s;
}
-// Handle the Get request in the trace
-Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key,
- const uint64_t& ts, const uint32_t& get_ret) {
+Status TraceAnalyzer::Handle(const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* /*result*/) {
+ total_writes_++;
+ // Note that, if the write happens in a transaction,
+ // 'Write' will be called twice, one for Prepare, one for
+ // Commit. Thus, in the trace, for the same WriteBatch, there
+ // will be two records if it is in a transaction. Here, we only
+ // process the reord that is committed. If write is non-transaction,
+ // HasBeginPrepare()==false, so we process it normally.
+ WriteBatch batch(record.GetWriteBatchRep().ToString());
+ if (batch.HasBeginPrepare() && !batch.HasCommit()) {
+ return Status::OK();
+ }
+ c_time_ = record.GetTimestamp();
+ Status s = batch.Iterate(this);
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot process the write batch in the trace\n");
+ return s;
+ }
+ return Status::OK();
+}
+
+Status TraceAnalyzer::Handle(const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* /*result*/) {
+ total_gets_++;
+
+ uint32_t cf_id = record.GetColumnFamilyID();
+ Slice key = record.GetKey();
+ uint64_t ts = record.GetTimestamp();
+
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
- s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key,
- value_size, ts);
+ s = WriteTraceSequence(TraceOperationType::kGet, cf_id, key, value_size,
+ ts);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
if (!ta_[TraceOperationType::kGet].enabled) {
return Status::OK();
}
- if (get_ret == 1) {
- value_size = 10;
+ value_size = 10;
+ s = KeyStatsInsertion(TraceOperationType::kGet, cf_id, key.ToString(),
+ value_size, ts);
+ if (!s.ok()) {
+ return Status::Corruption("Failed to insert key statistics");
+ }
+ return s;
+}
+
+Status TraceAnalyzer::Handle(const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* /*result*/) {
+ uint32_t cf_id = record.GetColumnFamilyID();
+ Slice key = record.GetKey();
+ uint64_t ts = record.GetTimestamp();
+
+ // To do: add lower/upper bounds
+
+ Status s;
+ size_t value_size = 0;
+ int type = -1;
+ if (record.GetTraceType() == kTraceIteratorSeek) {
+ type = TraceOperationType::kIteratorSeek;
+ total_seeks_++;
+ } else if (record.GetTraceType() == kTraceIteratorSeekForPrev) {
+ type = TraceOperationType::kIteratorSeekForPrev;
+ total_seek_prevs_++;
+ } else {
+ return s;
+ }
+ if (type == -1) {
+ return s;
+ }
+
+ if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
+ s = WriteTraceSequence(type, cf_id, key, value_size, ts);
+ if (!s.ok()) {
+ return Status::Corruption("Failed to write the trace sequence to file");
+ }
+ }
+
+ if (ta_[type].sample_count >= sample_max_) {
+ ta_[type].sample_count = 0;
+ }
+ if (ta_[type].sample_count > 0) {
+ ta_[type].sample_count++;
+ return Status::OK();
+ }
+ ta_[type].sample_count++;
+
+ if (!ta_[type].enabled) {
+ return Status::OK();
+ }
+ s = KeyStatsInsertion(type, cf_id, key.ToString(), value_size, ts);
+ if (!s.ok()) {
+ return Status::Corruption("Failed to insert key statistics");
+ }
+ return s;
+}
+
+Status TraceAnalyzer::Handle(const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* /*result*/) {
+ total_multigets_++;
+
+ std::vector<uint32_t> cf_ids = record.GetColumnFamilyIDs();
+ std::vector<Slice> keys = record.GetKeys();
+ uint64_t ts = record.GetTimestamp();
+
+ Status s;
+ size_t value_size = 0;
+ if (cf_ids.size() != keys.size()) {
+ // The size does not match is not the error of tracing and anayzing, we just
+ // report it to the user. The analyzing continues.
+ printf("The CF ID vector size does not match the keys vector size!\n");
+ }
+ size_t vector_size = std::min(cf_ids.size(), keys.size());
+ if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
+ for (size_t i = 0; i < vector_size; i++) {
+ assert(i < cf_ids.size() && i < keys.size());
+ s = WriteTraceSequence(TraceOperationType::kMultiGet, cf_ids[i], keys[i],
+ value_size, ts);
+ }
+ if (!s.ok()) {
+ return Status::Corruption("Failed to write the trace sequence to file");
+ }
+ }
+
+ if (ta_[TraceOperationType::kMultiGet].sample_count >= sample_max_) {
+ ta_[TraceOperationType::kMultiGet].sample_count = 0;
+ }
+ if (ta_[TraceOperationType::kMultiGet].sample_count > 0) {
+ ta_[TraceOperationType::kMultiGet].sample_count++;
+ return Status::OK();
+ }
+ ta_[TraceOperationType::kMultiGet].sample_count++;
+
+ if (!ta_[TraceOperationType::kMultiGet].enabled) {
+ return Status::OK();
+ }
+ for (size_t i = 0; i < vector_size; i++) {
+ assert(i < cf_ids.size() && i < keys.size());
+ s = KeyStatsInsertion(TraceOperationType::kMultiGet, cf_ids[i],
+ keys[i].ToString(), value_size, ts);
}
- s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id,
- key.ToString(), value_size, ts);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
}
// Handle the Put request in the write batch of the trace
-Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
- const Slice& value) {
+Status TraceAnalyzer::PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) {
Status s;
size_t value_size = value.ToString().size();
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
}
// Handle the Delete request in the write batch of the trace
-Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
- const Slice& key) {
+Status TraceAnalyzer::DeleteCF(uint32_t column_family_id, const Slice& key) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
}
// Handle the SingleDelete request in the write batch of the trace
-Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
- const Slice& key) {
+Status TraceAnalyzer::SingleDeleteCF(uint32_t column_family_id,
+ const Slice& key) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
}
// Handle the DeleteRange request in the write batch of the trace
-Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
- const Slice& begin_key,
- const Slice& end_key) {
+Status TraceAnalyzer::DeleteRangeCF(uint32_t column_family_id,
+ const Slice& begin_key,
+ const Slice& end_key) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
}
// Handle the Merge request in the write batch of the trace
-Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
- const Slice& value) {
+Status TraceAnalyzer::MergeCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) {
Status s;
size_t value_size = value.ToString().size();
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
return s;
}
-// Handle the Iterator request in the trace
-Status TraceAnalyzer::HandleIter(uint32_t column_family_id, const Slice& key,
- const uint64_t& ts, TraceType trace_type) {
- Status s;
- size_t value_size = 0;
- int type = -1;
- if (trace_type == kTraceIteratorSeek) {
- type = TraceOperationType::kIteratorSeek;
- } else if (trace_type == kTraceIteratorSeekForPrev) {
- type = TraceOperationType::kIteratorSeekForPrev;
- } else {
- return s;
- }
- if (type == -1) {
- return s;
- }
-
- if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
- s = WriteTraceSequence(type, column_family_id, key, value_size, ts);
- if (!s.ok()) {
- return Status::Corruption("Failed to write the trace sequence to file");
- }
- }
-
- if (ta_[type].sample_count >= sample_max_) {
- ta_[type].sample_count = 0;
- }
- if (ta_[type].sample_count > 0) {
- ta_[type].sample_count++;
- return Status::OK();
- }
- ta_[type].sample_count++;
-
- if (!ta_[type].enabled) {
- return Status::OK();
- }
- s = KeyStatsInsertion(type, column_family_id, key.ToString(), value_size, ts);
- if (!s.ok()) {
- return Status::Corruption("Failed to insert key statistics");
- }
- return s;
-}
-
-// Handle MultiGet queries in the trace
-Status TraceAnalyzer::HandleMultiGet(
- const std::vector<uint32_t>& column_family_ids,
- const std::vector<Slice>& keys, const uint64_t& ts) {
- Status s;
- size_t value_size = 0;
- if (column_family_ids.size() != keys.size()) {
- // The size does not match is not the error of tracing and anayzing, we just
- // report it to the user. The analyzing continues.
- printf("The CF ID vector size does not match the keys vector size!\n");
- }
- size_t vector_size = std::min(column_family_ids.size(), keys.size());
- if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
- for (size_t i = 0; i < vector_size; i++) {
- assert(i < column_family_ids.size() && i < keys.size());
- s = WriteTraceSequence(TraceOperationType::kMultiGet,
- column_family_ids[i], keys[i], value_size, ts);
- }
- if (!s.ok()) {
- return Status::Corruption("Failed to write the trace sequence to file");
- }
- }
-
- if (ta_[TraceOperationType::kMultiGet].sample_count >= sample_max_) {
- ta_[TraceOperationType::kMultiGet].sample_count = 0;
- }
- if (ta_[TraceOperationType::kMultiGet].sample_count > 0) {
- ta_[TraceOperationType::kMultiGet].sample_count++;
- return Status::OK();
- }
- ta_[TraceOperationType::kMultiGet].sample_count++;
-
- if (!ta_[TraceOperationType::kMultiGet].enabled) {
- return Status::OK();
- }
- for (size_t i = 0; i < vector_size; i++) {
- assert(i < column_family_ids.size() && i < keys.size());
- s = KeyStatsInsertion(TraceOperationType::kMultiGet, column_family_ids[i],
- keys[i].ToString(), value_size, ts);
- }
- if (!s.ok()) {
- return Status::Corruption("Failed to insert key statistics");
- }
- return s;
-}
-
// Before the analyzer is closed, the requested general statistic results are
// printed out here. In current stage, these information are not output to
// the files.
printf("The statistics related to query number need to times: %u\n",
sample_max_);
printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
- " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n",
- total_requests_, total_access_keys_, total_gets_, total_writes_);
+ " Total_gets: %" PRIu64 " Total_write_batches: %" PRIu64
+ " Total_seeks: %" PRIu64 " Total_seek_for_prevs: %" PRIu64
+ " Total_multigets: %" PRIu64 "\n",
+ total_requests_, total_access_keys_, total_gets_, total_writes_,
+ total_seeks_, total_seek_prevs_, total_multigets_);
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
continue;
std::map<uint32_t, uint32_t> cf_qps;
};
-class TraceAnalyzer {
+class TraceAnalyzer : private TraceRecord::Handler,
+ private WriteBatch::Handler {
public:
TraceAnalyzer(std::string& trace_path, std::string& output_path,
AnalyzerOptions _analyzer_opts);
Status WriteTraceUnit(TraceUnit& unit);
- // The trace processing functions for different type
- Status HandleGet(uint32_t column_family_id, const Slice& key,
- const uint64_t& ts, const uint32_t& get_ret);
- Status HandlePut(uint32_t column_family_id, const Slice& key,
- const Slice& value);
- Status HandleDelete(uint32_t column_family_id, const Slice& key);
- Status HandleSingleDelete(uint32_t column_family_id, const Slice& key);
- Status HandleDeleteRange(uint32_t column_family_id, const Slice& begin_key,
- const Slice& end_key);
- Status HandleMerge(uint32_t column_family_id, const Slice& key,
- const Slice& value);
- Status HandleIter(uint32_t column_family_id, const Slice& key,
- const uint64_t& ts, TraceType trace_type);
- Status HandleMultiGet(const std::vector<uint32_t>& column_family_ids,
- const std::vector<Slice>& keys, const uint64_t& ts);
std::vector<TypeUnit>& GetTaVector() { return ta_; }
private:
+ using TraceRecord::Handler::Handle;
+ Status Handle(const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ Status Handle(const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ Status Handle(const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ Status Handle(const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+
+ using WriteBatch::Handler::PutCF;
+ Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override;
+
+ using WriteBatch::Handler::DeleteCF;
+ Status DeleteCF(uint32_t column_family_id, const Slice& key) override;
+
+ using WriteBatch::Handler::SingleDeleteCF;
+ Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override;
+
+ using WriteBatch::Handler::DeleteRangeCF;
+ Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
+ const Slice& end_key) override;
+
+ using WriteBatch::Handler::MergeCF;
+ Status MergeCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override;
+
+ // The following hanlders are not implemented, return Status::OK() to avoid
+ // the running time assertion and other irrelevant falures.
+ using WriteBatch::Handler::PutBlobIndexCF;
+ Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::OK();
+ }
+
+ // The default implementation of LogData does nothing.
+ using WriteBatch::Handler::LogData;
+ void LogData(const Slice& /*blob*/) override {}
+
+ using WriteBatch::Handler::MarkBeginPrepare;
+ Status MarkBeginPrepare(bool = false) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkEndPrepare;
+ Status MarkEndPrepare(const Slice& /*xid*/) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkNoop;
+ Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkRollback;
+ Status MarkRollback(const Slice& /*xid*/) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkCommit;
+ Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); }
+
ROCKSDB_NAMESPACE::Env* env_;
EnvOptions env_options_;
std::unique_ptr<TraceReader> trace_reader_;
uint64_t total_access_keys_;
uint64_t total_gets_;
uint64_t total_writes_;
+ uint64_t total_seeks_;
+ uint64_t total_seek_prevs_;
+ uint64_t total_multigets_;
uint64_t trace_create_time_;
uint64_t begin_time_;
uint64_t end_time_;
Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
Status MakeStatisticQPS();
- // Set the default trace file version as version 0.2
- int trace_file_version_;
int db_version_;
};
-// write bach handler to be used for WriteBache iterator
-// when processing the write trace
-class TraceWriteHandler : public WriteBatch::Handler {
- public:
- TraceWriteHandler() { ta_ptr = nullptr; }
- explicit TraceWriteHandler(TraceAnalyzer* _ta_ptr) { ta_ptr = _ta_ptr; }
- ~TraceWriteHandler() {}
-
- virtual Status PutCF(uint32_t column_family_id, const Slice& key,
- const Slice& value) override {
- return ta_ptr->HandlePut(column_family_id, key, value);
- }
- virtual Status DeleteCF(uint32_t column_family_id,
- const Slice& key) override {
- return ta_ptr->HandleDelete(column_family_id, key);
- }
- virtual Status SingleDeleteCF(uint32_t column_family_id,
- const Slice& key) override {
- return ta_ptr->HandleSingleDelete(column_family_id, key);
- }
- virtual Status DeleteRangeCF(uint32_t column_family_id,
- const Slice& begin_key,
- const Slice& end_key) override {
- return ta_ptr->HandleDeleteRange(column_family_id, begin_key, end_key);
- }
- virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
- const Slice& value) override {
- return ta_ptr->HandleMerge(column_family_id, key, value);
- }
-
- // The following hanlders are not implemented, return Status::OK() to avoid
- // the running time assertion and other irrelevant falures.
- virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/,
- const Slice& /*key*/,
- const Slice& /*value*/) override {
- return Status::OK();
- }
-
- // The default implementation of LogData does nothing.
- virtual void LogData(const Slice& /*blob*/) override {}
-
- virtual Status MarkBeginPrepare(bool = false) override {
- return Status::OK();
- }
-
- virtual Status MarkEndPrepare(const Slice& /*xid*/) override {
- return Status::OK();
- }
-
- virtual Status MarkNoop(bool /*empty_batch*/) override {
- return Status::OK();
- }
-
- virtual Status MarkRollback(const Slice& /*xid*/) override {
- return Status::OK();
- }
-
- virtual Status MarkCommit(const Slice& /*xid*/) override {
- return Status::OK();
- }
-
- private:
- TraceAnalyzer* ta_ptr;
-};
-
int trace_analyzer_tool(int argc, char** argv);
} // namespace ROCKSDB_NAMESPACE
return old_state != payload_map;
}
-Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version,
+Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record) {
assert(trace != nullptr);
- assert(trace->type == kTraceWrite);
if (record != nullptr) {
record->reset(nullptr);
}
- PinnableSlice rep;
- if (trace_file_version < 2) {
- rep.PinSelf(trace->payload);
- } else {
- Slice buf(trace->payload);
- GetFixed64(&buf, &trace->payload_map);
- int64_t payload_map = static_cast<int64_t>(trace->payload_map);
- Slice write_batch_data;
- while (payload_map) {
- // Find the rightmost set bit.
- uint32_t set_pos =
- static_cast<uint32_t>(log2(payload_map & -payload_map));
- switch (set_pos) {
- case TracePayloadType::kWriteBatchData:
- GetLengthPrefixedSlice(&buf, &write_batch_data);
- break;
- default:
- assert(false);
+ switch (trace->type) {
+ // Write
+ case kTraceWrite: {
+ PinnableSlice rep;
+ if (trace_file_version < 2) {
+ rep.PinSelf(trace->payload);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ Slice write_batch_data;
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kWriteBatchData: {
+ GetLengthPrefixedSlice(&buf, &write_batch_data);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ rep.PinSelf(write_batch_data);
}
- // unset the rightmost bit.
- payload_map &= (payload_map - 1);
- }
- rep.PinSelf(write_batch_data);
- }
-
- if (record != nullptr) {
- record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
- }
-
- return Status::OK();
-}
-
-Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record) {
- assert(trace != nullptr);
- assert(trace->type == kTraceGet);
-
- if (record != nullptr) {
- record->reset(nullptr);
- }
- uint32_t cf_id = 0;
- Slice get_key;
-
- if (trace_file_version < 2) {
- DecodeCFAndKey(trace->payload, &cf_id, &get_key);
- } else {
- Slice buf(trace->payload);
- GetFixed64(&buf, &trace->payload_map);
- int64_t payload_map = static_cast<int64_t>(trace->payload_map);
- while (payload_map) {
- // Find the rightmost set bit.
- uint32_t set_pos =
- static_cast<uint32_t>(log2(payload_map & -payload_map));
- switch (set_pos) {
- case TracePayloadType::kGetCFID:
- GetFixed32(&buf, &cf_id);
- break;
- case TracePayloadType::kGetKey:
- GetLengthPrefixedSlice(&buf, &get_key);
- break;
- default:
- assert(false);
+ if (record != nullptr) {
+ record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
}
- // unset the rightmost bit.
- payload_map &= (payload_map - 1);
- }
- }
- if (record != nullptr) {
- PinnableSlice ps;
- ps.PinSelf(get_key);
- record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
- }
-
- return Status::OK();
-}
+ return Status::OK();
+ }
+ // Get
+ case kTraceGet: {
+ uint32_t cf_id = 0;
+ Slice get_key;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &get_key);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kGetCFID: {
+ GetFixed32(&buf, &cf_id);
+ break;
+ }
+ case TracePayloadType::kGetKey: {
+ GetLengthPrefixedSlice(&buf, &get_key);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ }
-Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record) {
- assert(trace != nullptr);
- assert(trace->type == kTraceIteratorSeek ||
- trace->type == kTraceIteratorSeekForPrev);
+ if (record != nullptr) {
+ PinnableSlice ps;
+ ps.PinSelf(get_key);
+ record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
+ }
- if (record != nullptr) {
- record->reset(nullptr);
- }
+ return Status::OK();
+ }
+ // Iterator Seek and SeekForPrev
+ case kTraceIteratorSeek:
+ case kTraceIteratorSeekForPrev: {
+ uint32_t cf_id = 0;
+ Slice iter_key;
+ Slice lower_bound;
+ Slice upper_bound;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kIterCFID: {
+ GetFixed32(&buf, &cf_id);
+ break;
+ }
+ case TracePayloadType::kIterKey: {
+ GetLengthPrefixedSlice(&buf, &iter_key);
+ break;
+ }
+ case TracePayloadType::kIterLowerBound: {
+ GetLengthPrefixedSlice(&buf, &lower_bound);
+ break;
+ }
+ case TracePayloadType::kIterUpperBound: {
+ GetLengthPrefixedSlice(&buf, &upper_bound);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ }
- uint32_t cf_id = 0;
- Slice iter_key;
- Slice lower_bound;
- Slice upper_bound;
-
- if (trace_file_version < 2) {
- DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
- } else {
- Slice buf(trace->payload);
- GetFixed64(&buf, &trace->payload_map);
- int64_t payload_map = static_cast<int64_t>(trace->payload_map);
- while (payload_map) {
- // Find the rightmost set bit.
- uint32_t set_pos =
- static_cast<uint32_t>(log2(payload_map & -payload_map));
- switch (set_pos) {
- case TracePayloadType::kIterCFID:
- GetFixed32(&buf, &cf_id);
- break;
- case TracePayloadType::kIterKey:
- GetLengthPrefixedSlice(&buf, &iter_key);
- break;
- case TracePayloadType::kIterLowerBound:
- GetLengthPrefixedSlice(&buf, &lower_bound);
- break;
- case TracePayloadType::kIterUpperBound:
- GetLengthPrefixedSlice(&buf, &upper_bound);
- break;
- default:
- assert(false);
+ if (record != nullptr) {
+ PinnableSlice ps_key;
+ ps_key.PinSelf(iter_key);
+ PinnableSlice ps_lower;
+ ps_lower.PinSelf(lower_bound);
+ PinnableSlice ps_upper;
+ ps_upper.PinSelf(upper_bound);
+ record->reset(new IteratorSeekQueryTraceRecord(
+ static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
+ cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
+ trace->ts));
}
- // unset the rightmost bit.
- payload_map &= (payload_map - 1);
- }
- }
- if (record != nullptr) {
- PinnableSlice ps_key;
- ps_key.PinSelf(iter_key);
- PinnableSlice ps_lower;
- ps_lower.PinSelf(lower_bound);
- PinnableSlice ps_upper;
- ps_upper.PinSelf(upper_bound);
- record->reset(new IteratorSeekQueryTraceRecord(
- static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), cf_id,
- std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
- trace->ts));
- }
+ return Status::OK();
+ }
+ // MultiGet
+ case kTraceMultiGet: {
+ if (trace_file_version < 2) {
+ return Status::Corruption("MultiGet is not supported.");
+ }
- return Status::OK();
-}
+ uint32_t multiget_size = 0;
+ std::vector<uint32_t> cf_ids;
+ std::vector<PinnableSlice> multiget_keys;
+
+ Slice cfids_payload;
+ Slice keys_payload;
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kMultiGetSize: {
+ GetFixed32(&buf, &multiget_size);
+ break;
+ }
+ case TracePayloadType::kMultiGetCFIDs: {
+ GetLengthPrefixedSlice(&buf, &cfids_payload);
+ break;
+ }
+ case TracePayloadType::kMultiGetKeys: {
+ GetLengthPrefixedSlice(&buf, &keys_payload);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ if (multiget_size == 0) {
+ return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+ }
-Status TracerHelper::DecodeMultiGetRecord(
- Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record) {
- assert(trace != nullptr);
- assert(trace->type == kTraceMultiGet);
+ // Decode the cfids_payload and keys_payload
+ cf_ids.reserve(multiget_size);
+ multiget_keys.reserve(multiget_size);
+ for (uint32_t i = 0; i < multiget_size; i++) {
+ uint32_t tmp_cfid;
+ Slice tmp_key;
+ GetFixed32(&cfids_payload, &tmp_cfid);
+ GetLengthPrefixedSlice(&keys_payload, &tmp_key);
+ cf_ids.push_back(tmp_cfid);
+ Slice s(tmp_key);
+ PinnableSlice ps;
+ ps.PinSelf(s);
+ multiget_keys.push_back(std::move(ps));
+ }
- if (record != nullptr) {
- record->reset(nullptr);
- }
+ if (record != nullptr) {
+ record->reset(new MultiGetQueryTraceRecord(
+ std::move(cf_ids), std::move(multiget_keys), trace->ts));
+ }
- if (trace_file_version < 2) {
- return Status::Corruption("MultiGet is not supported.");
- }
-
- uint32_t multiget_size = 0;
- std::vector<uint32_t> cf_ids;
- std::vector<PinnableSlice> multiget_keys;
-
- Slice cfids_payload;
- Slice keys_payload;
- Slice buf(trace->payload);
- GetFixed64(&buf, &trace->payload_map);
- int64_t payload_map = static_cast<int64_t>(trace->payload_map);
- while (payload_map) {
- // Find the rightmost set bit.
- uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
- switch (set_pos) {
- case TracePayloadType::kMultiGetSize:
- GetFixed32(&buf, &multiget_size);
- break;
- case TracePayloadType::kMultiGetCFIDs:
- GetLengthPrefixedSlice(&buf, &cfids_payload);
- break;
- case TracePayloadType::kMultiGetKeys:
- GetLengthPrefixedSlice(&buf, &keys_payload);
- break;
- default:
- assert(false);
+ return Status::OK();
}
- // unset the rightmost bit.
- payload_map &= (payload_map - 1);
+ default:
+ return Status::NotSupported("Unsupported trace type.");
}
- if (multiget_size == 0) {
- return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
- }
-
- // Decode the cfids_payload and keys_payload
- cf_ids.reserve(multiget_size);
- multiget_keys.reserve(multiget_size);
- for (uint32_t i = 0; i < multiget_size; i++) {
- uint32_t tmp_cfid;
- Slice tmp_key;
- GetFixed32(&cfids_payload, &tmp_cfid);
- GetLengthPrefixedSlice(&keys_payload, &tmp_key);
- cf_ids.push_back(tmp_cfid);
- Slice s(tmp_key);
- PinnableSlice ps;
- ps.PinSelf(s);
- multiget_keys.push_back(std::move(ps));
- }
-
- if (record != nullptr) {
- record->reset(new MultiGetQueryTraceRecord(
- std::move(cf_ids), std::move(multiget_keys), trace->ts));
- }
-
- return Status::OK();
}
Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
static bool SetPayloadMap(uint64_t& payload_map,
const TracePayloadType payload_type);
- // Decode the write payload and store in WrteiPayload
- static Status DecodeWriteRecord(Trace* trace, int trace_file_version,
+ // Decode a Trace object into the corresponding TraceRecord.
+ // Return Status::OK() if nothing is wrong, record will be set accordingly.
+ // Return Status::NotSupported() if the trace type is not support, or the
+ // corresponding error status, record will be set to nullptr.
+ static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record);
-
- // Decode the get payload and store in WrteiPayload
- static Status DecodeGetRecord(Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record);
-
- // Decode the iter payload and store in WrteiPayload
- static Status DecodeIterRecord(Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record);
-
- // Decode the multiget payload and store in MultiGetPayload
- static Status DecodeMultiGetRecord(Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record);
};
// Tracer captures all RocksDB operations using a user-provided TraceWriter.
return s;
}
- return DecodeTraceRecord(&trace, trace_file_version_, record);
+ return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record);
}
Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
// In single-threaded replay, decode first then sleep.
std::unique_ptr<TraceRecord> record;
- s = DecodeTraceRecord(&trace, trace_file_version_, &record);
+ s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record);
if (!s.ok() && !s.IsNotSupported()) {
break;
}
return TracerHelper::DecodeTrace(encoded_trace, trace);
}
-Status ReplayerImpl::DecodeTraceRecord(Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record) {
- switch (trace->type) {
- case kTraceWrite:
- return TracerHelper::DecodeWriteRecord(trace, trace_file_version, record);
- case kTraceGet:
- return TracerHelper::DecodeGetRecord(trace, trace_file_version, record);
- case kTraceIteratorSeek:
- case kTraceIteratorSeekForPrev:
- return TracerHelper::DecodeIterRecord(trace, trace_file_version, record);
- case kTraceMultiGet:
- return TracerHelper::DecodeMultiGetRecord(trace, trace_file_version,
- record);
- case kTraceEnd:
- return Status::Incomplete("Trace end.");
- default:
- return Status::NotSupported("Unsupported trace type.");
- }
-}
-
void ReplayerImpl::BackgroundWork(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
std::unique_ptr<TraceRecord> record;
- Status s =
- DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
+ Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry),
+ ra->trace_file_version, &record);
if (!s.ok()) {
// Stop the replay
if (ra->error_cb != nullptr) {
Status ReadFooter(Trace* footer);
Status ReadTrace(Trace* trace);
- // Generic function to convert a Trace to TraceRecord.
- static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
- std::unique_ptr<TraceRecord>* record);
-
// Generic function to execute a Trace in a thread pool.
static void BackgroundWork(void* arg);