single_iter->SeekForPrev(iter_payload.iter_key);
ops++;
delete single_iter;
+ } else if (trace.type == kTraceMultiGet) {
+ MultiGetPayload multiget_payload;
+ assert(trace_file_version_ >= 2);
+ TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
+ std::vector<ColumnFamilyHandle*> v_cfd;
+ std::vector<Slice> keys;
+ assert(multiget_payload.cf_ids.size() ==
+ multiget_payload.multiget_keys.size());
+ for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
+ assert(i < multiget_payload.cf_ids.size() &&
+ i < multiget_payload.multiget_keys.size());
+ if (cf_map_.find(multiget_payload.cf_ids[i]) == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+ v_cfd.push_back(cf_map_[multiget_payload.cf_ids[i]]);
+ keys.push_back(Slice(multiget_payload.multiget_keys[i]));
+ }
+ std::vector<std::string> values;
+ std::vector<Status> ss = db_->MultiGet(roptions, v_cfd, keys, &values);
} else if (trace.type == kTraceEnd) {
// Do nothing for now.
// TODO: Add some validations later.
thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
nullptr, nullptr);
ops++;
+ } else if (ra->trace_entry.type == kTraceMultiGet) {
+ thread_pool.Schedule(&Replayer::BGWorkMultiGet, ra.release(), nullptr,
+ nullptr);
+ ops++;
} else if (ra->trace_entry.type == kTraceEnd) {
// Do nothing for now.
// TODO: Add some validations later.
return;
}
+void Replayer::BGWorkMultiGet(void* arg) {
+ std::unique_ptr<ReplayerWorkerArg> ra(
+ reinterpret_cast<ReplayerWorkerArg*>(arg));
+ assert(ra != nullptr);
+ auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
+ ra->cf_map);
+ MultiGetPayload multiget_payload;
+ if (ra->trace_file_version < 2) {
+ return;
+ }
+ TracerHelper::DecodeMultiGetPayload(&(ra->trace_entry), &multiget_payload);
+ std::vector<ColumnFamilyHandle*> v_cfd;
+ std::vector<Slice> keys;
+ if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
+ return;
+ }
+ for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
+ if (cf_map->find(multiget_payload.cf_ids[i]) == cf_map->end()) {
+ return;
+ }
+ v_cfd.push_back((*cf_map)[multiget_payload.cf_ids[i]]);
+ keys.push_back(Slice(multiget_payload.multiget_keys[i]));
+ }
+ std::vector<std::string> values;
+ std::vector<Status> ss = ra->db->MultiGet(ra->roptions, v_cfd, keys, &values);
+ return;
+}
+
} // namespace ROCKSDB_NAMESPACE