return conn;
}
+Client::mon_command_t::mon_command_t(ceph::ref_t<MMonCommand> req)
+ : req(req)
+{}
+
Client::Client(crimson::net::Messenger& messenger,
crimson::common::AuthHandler& auth_handler)
// currently, crimson is OSD-only
const auto tid = m->get_tid();
if (auto found = mon_commands.find(tid);
found != mon_commands.end()) {
- auto& result = found->second;
+ auto& command = found->second;
logger().trace("{} {}", __func__, tid);
- result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data())));
+ command.result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data())));
mon_commands.erase(found);
} else {
logger().warn("{} {} not found", __func__, tid);
m->set_tid(tid);
m->cmd = {std::move(cmd)};
m->set_data(std::move(bl));
- auto& req = mon_commands[tid];
- return send_message(m).then([&req] {
- return req.get_future();
+ [[maybe_unused]] auto [command, added] =
+ mon_commands.try_emplace(tid, m);
+ assert(added);
+ return send_message(m).then([&result=command->second.result] {
+ return result.get_future();
});
}
}
pending_messages.clear();
return seastar::now();
+ }).then([this] {
+ return seastar::parallel_for_each(mon_commands,
+ [this](auto &tid_command) {
+ return send_message(tid_command.second.req);
+ });
});
}
struct MMonMap;
struct MMonSubscribeAck;
struct MMonGetVersionReply;
+struct MMonCommand;
struct MMonCommandAck;
struct MLogAck;
struct MConfig;
ceph_tid_t last_mon_command_id = 0;
using command_result_t =
seastar::future<std::tuple<std::int32_t, string, ceph::bufferlist>>;
- std::map<ceph_tid_t, typename command_result_t::promise_type> mon_commands;
+ struct mon_command_t {
+ ceph::ref_t<MMonCommand> req;
+ typename command_result_t::promise_type result;
+ mon_command_t(ceph::ref_t<MMonCommand> req);
+ };
+ std::map<ceph_tid_t, mon_command_t> mon_commands;
MonSub sub;