]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/monc: take care of connection lost when sending messages
authorXuehan Xu <xxhdx1985126@163.com>
Tue, 2 Jun 2020 07:45:41 +0000 (15:45 +0800)
committerXuehan Xu <xxhdx1985126@163.com>
Thu, 18 Jun 2020 01:50:50 +0000 (09:50 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@163.com>
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h

index c8cfac8aa099aee5f721cfd2f4b11af54067ad46..ca631ed9a0b977e03a68ea972a91832e94c5ed24 100644 (file)
@@ -557,7 +557,10 @@ void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
     } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
       logger().warn("active conn reset {}", conn->get_peer_addr());
       active_con.reset();
-      return reopen_session(-1);
+      return reopen_session(-1).then([this] {
+       send_pendings();
+       return seastar::now();
+      });
     } else {
       return seastar::now();
     }
@@ -801,7 +804,10 @@ seastar::future<> Client::handle_monmap(crimson::net::Connection* conn,
     }
   } else {
     logger().warn("mon.{} went away", cur_mon);
-    return reopen_session(-1);
+    return reopen_session(-1).then([this] {
+      send_pendings();
+      return seastar::now();
+    });
   }
 }
 
@@ -839,7 +845,7 @@ Client::get_version_t Client::get_version(const std::string& map)
   m->handle = tid;
   m->what = map;
   auto& req = version_reqs[tid];
-  return active_con->get_conn()->send(m).then([&req] {
+  return send_message(m).then([&req] {
     return req.get_future();
   });
 }
@@ -917,7 +923,10 @@ std::vector<unsigned> Client::get_random_mons(unsigned n) const
 
 seastar::future<> Client::authenticate()
 {
-  return reopen_session(-1);
+  return reopen_session(-1).then([this] {
+    send_pendings();
+    return seastar::now();
+  });
 }
 
 seastar::future<> Client::stop()
@@ -1024,14 +1033,32 @@ Client::run_command(const std::vector<std::string>& cmd,
   m->cmd = cmd;
   m->set_data(bl);
   auto& req = mon_commands[tid];
-  return active_con->get_conn()->send(m).then([&req] {
+  return send_message(m).then([&req] {
     return req.get_future();
   });
 }
 
 seastar::future<> Client::send_message(MessageRef m)
 {
-  return active_con->get_conn()->send(m);
+  if (active_con) {
+    if (!pending_messages.empty()) {
+      send_pendings();
+    }
+    return active_con->get_conn()->send(m);
+  }
+  auto& delayed = pending_messages.emplace_back(m);
+  return delayed.pr.get_future();
+}
+
+void Client::send_pendings()
+{
+  if (active_con) {
+    for (auto& m : pending_messages) {
+      (void) active_con->get_conn()->send(m.msg);
+      m.pr.set_value();
+    }
+    pending_messages.clear();
+  }
 }
 
 bool Client::sub_want(const std::string& what, version_t start, unsigned flags)
@@ -1067,7 +1094,7 @@ seastar::future<> Client::renew_subs()
   auto m = make_message<MMonSubscribe>();
   m->what = sub.get_subs();
   m->hostname = ceph_get_short_hostname();
-  return active_con->get_conn()->send(m).then([this] {
+  return send_message(m).then([this] {
     sub.renewed();
   });
 }
index c33ac992698318b0938fc5efbf6f3a29a85b1dc1..9a05e7ab78d42f1fb2b2a6527c7e221073b405ae 100644 (file)
@@ -154,6 +154,7 @@ private:
   seastar::future<> handle_log_ack(Ref<MLogAck> m);
   seastar::future<> handle_config(Ref<MConfig> m);
 
+  void send_pendings();
 private:
   seastar::future<> load_keyring();
   seastar::future<> authenticate();
@@ -163,6 +164,14 @@ private:
   std::vector<unsigned> get_random_mons(unsigned n) const;
   seastar::future<> _add_conn(unsigned rank, uint64_t global_id);
   crimson::common::Gated gate;
+
+  // messages that are waiting for the active_con to be available
+  struct pending_msg_t {
+    pending_msg_t(MessageRef& m) : msg(m) {}
+    MessageRef msg;
+    seastar::promise<> pr;
+  };
+  std::deque<pending_msg_t> pending_messages;
 };
 
 inline std::ostream& operator<<(std::ostream& out, const Client& client) {