From: Xuehan Xu Date: Tue, 2 Jun 2020 07:45:41 +0000 (+0800) Subject: crimson/monc: take care of connection lost when sending messages X-Git-Tag: v16.1.0~2024^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2a1d6d777fdfdd66bb1e7613c0b1af2ab138a0c0;p=ceph.git crimson/monc: take care of connection lost when sending messages Signed-off-by: Xuehan Xu --- diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index c8cfac8aa09..ca631ed9a0b 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -557,7 +557,10 @@ void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { logger().warn("active conn reset {}", conn->get_peer_addr()); active_con.reset(); - return reopen_session(-1); + return reopen_session(-1).then([this] { + send_pendings(); + return seastar::now(); + }); } else { return seastar::now(); } @@ -801,7 +804,10 @@ seastar::future<> Client::handle_monmap(crimson::net::Connection* conn, } } else { logger().warn("mon.{} went away", cur_mon); - return reopen_session(-1); + return reopen_session(-1).then([this] { + send_pendings(); + return seastar::now(); + }); } } @@ -839,7 +845,7 @@ Client::get_version_t Client::get_version(const std::string& map) m->handle = tid; m->what = map; auto& req = version_reqs[tid]; - return active_con->get_conn()->send(m).then([&req] { + return send_message(m).then([&req] { return req.get_future(); }); } @@ -917,7 +923,10 @@ std::vector Client::get_random_mons(unsigned n) const seastar::future<> Client::authenticate() { - return reopen_session(-1); + return reopen_session(-1).then([this] { + send_pendings(); + return seastar::now(); + }); } seastar::future<> Client::stop() @@ -1024,14 +1033,32 @@ Client::run_command(const std::vector& cmd, m->cmd = cmd; m->set_data(bl); auto& req = mon_commands[tid]; - return active_con->get_conn()->send(m).then([&req] { + return send_message(m).then([&req] { return req.get_future(); }); } seastar::future<> Client::send_message(MessageRef m) { - return active_con->get_conn()->send(m); + if (active_con) { + if (!pending_messages.empty()) { + send_pendings(); + } + return active_con->get_conn()->send(m); + } + auto& delayed = pending_messages.emplace_back(m); + return delayed.pr.get_future(); +} + +void Client::send_pendings() +{ + if (active_con) { + for (auto& m : pending_messages) { + (void) active_con->get_conn()->send(m.msg); + m.pr.set_value(); + } + pending_messages.clear(); + } } bool Client::sub_want(const std::string& what, version_t start, unsigned flags) @@ -1067,7 +1094,7 @@ seastar::future<> Client::renew_subs() auto m = make_message(); m->what = sub.get_subs(); m->hostname = ceph_get_short_hostname(); - return active_con->get_conn()->send(m).then([this] { + return send_message(m).then([this] { sub.renewed(); }); } diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index c33ac992698..9a05e7ab78d 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -154,6 +154,7 @@ private: seastar::future<> handle_log_ack(Ref m); seastar::future<> handle_config(Ref m); + void send_pendings(); private: seastar::future<> load_keyring(); seastar::future<> authenticate(); @@ -163,6 +164,14 @@ private: std::vector get_random_mons(unsigned n) const; seastar::future<> _add_conn(unsigned rank, uint64_t global_id); crimson::common::Gated gate; + + // messages that are waiting for the active_con to be available + struct pending_msg_t { + pending_msg_t(MessageRef& m) : msg(m) {} + MessageRef msg; + seastar::promise<> pr; + }; + std::deque pending_messages; }; inline std::ostream& operator<<(std::ostream& out, const Client& client) {