} else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
logger().warn("active conn reset {}", conn->get_peer_addr());
active_con.reset();
- return reopen_session(-1);
+ return reopen_session(-1).then([this] {
+ send_pendings();
+ return seastar::now();
+ });
} else {
return seastar::now();
}
}
} else {
logger().warn("mon.{} went away", cur_mon);
- return reopen_session(-1);
+ return reopen_session(-1).then([this] {
+ send_pendings();
+ return seastar::now();
+ });
}
}
m->handle = tid;
m->what = map;
auto& req = version_reqs[tid];
- return active_con->get_conn()->send(m).then([&req] {
+ return send_message(m).then([&req] {
return req.get_future();
});
}
seastar::future<> Client::authenticate()
{
- return reopen_session(-1);
+ return reopen_session(-1).then([this] {
+ send_pendings();
+ return seastar::now();
+ });
}
seastar::future<> Client::stop()
m->cmd = cmd;
m->set_data(bl);
auto& req = mon_commands[tid];
- return active_con->get_conn()->send(m).then([&req] {
+ return send_message(m).then([&req] {
return req.get_future();
});
}
seastar::future<> Client::send_message(MessageRef m)
{
- return active_con->get_conn()->send(m);
+ if (active_con) {
+ if (!pending_messages.empty()) {
+ send_pendings();
+ }
+ return active_con->get_conn()->send(m);
+ }
+ auto& delayed = pending_messages.emplace_back(m);
+ return delayed.pr.get_future();
+}
+
+void Client::send_pendings()
+{
+ if (active_con) {
+ for (auto& m : pending_messages) {
+ (void) active_con->get_conn()->send(m.msg);
+ m.pr.set_value();
+ }
+ pending_messages.clear();
+ }
}
bool Client::sub_want(const std::string& what, version_t start, unsigned flags)
auto m = make_message<MMonSubscribe>();
m->what = sub.get_subs();
m->hostname = ceph_get_short_hostname();
- return active_con->get_conn()->send(m).then([this] {
+ return send_message(m).then([this] {
sub.renewed();
});
}
seastar::future<> handle_log_ack(Ref<MLogAck> m);
seastar::future<> handle_config(Ref<MConfig> m);
+ void send_pendings();
private:
seastar::future<> load_keyring();
seastar::future<> authenticate();
std::vector<unsigned> get_random_mons(unsigned n) const;
seastar::future<> _add_conn(unsigned rank, uint64_t global_id);
crimson::common::Gated gate;
+
+ // messages that are waiting for the active_con to be available
+ struct pending_msg_t {
+ pending_msg_t(MessageRef& m) : msg(m) {}
+ MessageRef msg;
+ seastar::promise<> pr;
+ };
+ std::deque<pending_msg_t> pending_messages;
};
inline std::ostream& operator<<(std::ostream& out, const Client& client) {