]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: change close() to mark_down()
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 13 Mar 2020 06:22:40 +0000 (14:22 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 20 Mar 2020 08:07:49 +0000 (16:07 +0800)
* be explicit that mark_down() won't trigger reset event;
* return void so no deadlock is possible and memory is still safe
guarded by Messenger::shutdown();
* related changes in crimson/osd;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
13 files changed:
src/crimson/mgr/client.cc
src/crimson/mon/MonClient.cc
src/crimson/net/Connection.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/test/crimson/test_messenger.cc

index bd40d16e709bd6b9d09925537e474a1adc757f3e..9134c2602132ba557d88351ef8c2151eb7241614 100644 (file)
@@ -40,9 +40,7 @@ seastar::future<> Client::stop()
 {
   return gate.close().then([this] {
     if (conn) {
-      return conn->close();
-    } else {
-      return seastar::now();
+      conn->mark_down();
     }
   });
 }
@@ -85,8 +83,7 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c)
 seastar::future<> Client::reconnect()
 {
   if (conn) {
-    // crimson::net::Protocol::close() is able to close() in background
-    (void)conn->close();
+    conn->mark_down();
     conn = {};
   }
   if (!mgrmap.get_available()) {
index 03d57bb71aab6ef953a3a23ebba137fa7295d225..0018037013706a4e8c9c49f57dd0ce58a551956b 100644 (file)
@@ -79,7 +79,7 @@ public:
                              const std::vector<uint32_t>& allowed_modes);
 
   // v1 and v2
-  seastar::future<> close();
+  void close();
   bool is_my_peer(const entity_addr_t& addr) const;
   AuthAuthorizer* get_authorizer(entity_type_t peer) const;
   KeyStore& get_keys();
@@ -427,16 +427,14 @@ int Connection::handle_auth_bad_method(uint32_t old_auth_method,
   return 0;
 }
 
-seastar::future<> Connection::close()
+void Connection::close()
 {
   reply.set_value(Ref<MAuthReply>(nullptr));
   reply = {};
   auth_done.set_value(AuthResult::canceled);
   auth_done = {};
   if (conn && !std::exchange(closed, true)) {
-    return conn->close();
-  } else {
-    return seastar::now();
+    conn->mark_down();
   }
 }
 
@@ -551,7 +549,8 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn)
                             });
   if (found != pending_conns.end()) {
     logger().warn("pending conn reset by {}", conn->get_peer_addr());
-    return (*found)->close();
+    (*found)->close();
+    return seastar::now();
   } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
     logger().warn("active conn reset {}", conn->get_peer_addr());
     active_con.reset();
@@ -920,9 +919,7 @@ seastar::future<> Client::stop()
   return tick_gate.close().then([this] {
     timer.cancel();
     if (active_con) {
-      return active_con->close();
-    } else {
-      return seastar::now();
+      active_con->close();
     }
   });
 }
@@ -953,9 +950,8 @@ seastar::future<> Client::reopen_session(int rank)
       } else {
         return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
           .handle_exception([conn](auto ep) {
-            return conn->close().then([ep=std::move(ep)](){
-             return seastar::make_exception_future<Connection::AuthResult>(ep);
-            });
+            conn->mark_down();
+            return seastar::make_exception_future<Connection::AuthResult>(ep);
           });
       }
     }).then([peer, this](auto result) {
@@ -986,21 +982,13 @@ seastar::future<> Client::reopen_session(int rank)
       ceph_assert(!active_con && !pending_conns.empty());
       active_con = std::move(*found);
       found->reset();
-      auto ret = seastar::do_with(
-       std::move(pending_conns),
-       [](auto &pending_conns) {
-         return seastar::parallel_for_each(
-           pending_conns,
-           [] (auto &conn) {
-             if (!conn) {
-               return seastar::now();
-             } else {
-               return conn->close();
-             }
-           });
-       });
+      for (auto& conn : pending_conns) {
+        if (conn) {
+          conn->close();
+        }
+      }
       pending_conns.clear();
-      return ret;
+      return seastar::now();
     }).then([]() {
       logger().debug("reopen_session mon connection attempts complete");
     }).handle_exception([](auto ep) {
index 3beb8f42bb5698c491aa8eba5ee9ba961b692fd4..04c57cb120fc4e920a811b0f2c5652911879476e 100644 (file)
@@ -107,10 +107,9 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
   /// handshake
   virtual seastar::future<> keepalive() = 0;
 
-  // close the connection and cancel any any pending futures from read/send
-  // Note it's OK to discard the returned future because Messenger::shutdown()
-  // will wait for all connections closed
-  virtual seastar::future<> close() = 0;
+  // close the connection and cancel any any pending futures from read/send,
+  // without dispatching any reset event
+  virtual void mark_down() = 0;
 
   virtual void print(ostream& out) const = 0;
 
index 26344b0fb4fc9c17e822961ff8562fd6d28784a4..47a79f0575c2988a1f57a3529f00e136f58b4fb2 100644 (file)
@@ -584,7 +584,7 @@ seastar::future<stop_t> ProtocolV1::repeat_handle_connect()
                         conn, *existing,
                         static_cast<int>(existing->protocol->proto_type));
           // NOTE: this is following async messenger logic, but we may miss the reset event.
-          (void) existing->close();
+          existing->mark_down();
         } else {
           return handle_connect_with_existing(existing, std::move(authorizer_reply));
         }
index cc752f43a1c3a118ef09065c692ff66a85854098..76e525b510ed22497cbf65c5fa2decfab9645e39 100644 (file)
@@ -1390,7 +1390,7 @@ ProtocolV2::server_reconnect()
                     conn, *existing_conn,
                     static_cast<int>(existing_conn->protocol->proto_type));
       // NOTE: this is following async messenger logic, but we may miss the reset event.
-      (void) existing_conn->close();
+      existing_conn->mark_down();
       return send_reset(true);
     }
 
index 5bf2c30c4a8614a11eaaf157175e21af4af947c9..49b6702929285d02ad6a021aa640b19314395f94 100644 (file)
@@ -87,10 +87,10 @@ seastar::future<> SocketConnection::keepalive()
   return protocol->keepalive();
 }
 
-seastar::future<> SocketConnection::close()
+void SocketConnection::mark_down()
 {
   assert(seastar::engine().cpu_id() == shard_id());
-  return protocol->close_clean(false);
+  protocol->close(false);
 }
 
 bool SocketConnection::update_rx_seq(seq_num_t seq)
@@ -126,6 +126,12 @@ SocketConnection::start_accept(SocketRef&& sock,
   protocol->start_accept(std::move(sock), _peer_addr);
 }
 
+seastar::future<>
+SocketConnection::close_clean(bool dispatch_reset)
+{
+  return protocol->close_clean(dispatch_reset);
+}
+
 seastar::shard_id SocketConnection::shard_id() const {
   return messenger.shard_id();
 }
index de814c9418b413953854068d89086f45523c1e06..a5b63473a3336c2a273c60c44af2272760ecdecf 100644 (file)
@@ -90,7 +90,7 @@ class SocketConnection : public Connection {
 
   seastar::future<> keepalive() override;
 
-  seastar::future<> close() override;
+  void mark_down() override;
 
   void print(ostream& out) const override;
 
@@ -103,6 +103,8 @@ class SocketConnection : public Connection {
   void start_accept(SocketRef&& socket,
                     const entity_addr_t& peer_addr);
 
+  seastar::future<> close_clean(bool dispatch_reset);
+
   bool is_server_side() const {
     return policy.server;
   }
index 423e7d4edc987303681b0acb98945b958bb69a09..2817602cac7d26a62642dd20ed38b143ca5b2f0c 100644 (file)
@@ -163,12 +163,12 @@ seastar::future<> SocketMessenger::shutdown()
   // close all connections
   }).then([this] {
     return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
-      return conn->close();
+      return conn->close_clean(false);
     });
   }).then([this] {
     ceph_assert(accepting_conns.empty());
     return seastar::parallel_for_each(connections, [] (auto conn) {
-      return conn.second->close();
+      return conn.second->close_clean(false);
     });
   }).then([this] {
     ceph_assert(connections.empty());
index 4318d29f88033a31f2a219fbccfd609169599f83..ab08dfa8efbaa6255a8ec4b382bf6e57dee85864 100644 (file)
@@ -113,31 +113,19 @@ void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
   }
 }
 
-seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
+Heartbeat::osds_t Heartbeat::remove_down_peers()
 {
   osds_t osds;
   for (auto& peer : peers) {
-    osds.push_back(peer.first);
+    auto osd = peer.first;
+    auto osdmap = service.get_osdmap_service().get_map();
+    if (!osdmap->is_up(osd)) {
+      remove_peer(osd);
+    } else if (peers[osd].epoch < osdmap->get_epoch()) {
+      osds.push_back(osd);
+    }
   }
-  return seastar::map_reduce(std::move(osds),
-    [this](auto& osd) {
-      auto osdmap = service.get_osdmap_service().get_map();
-      if (!osdmap->is_up(osd)) {
-        return remove_peer(osd).then([] {
-          return seastar::make_ready_future<osd_id_t>(-1);
-        });
-      } else if (peers[osd].epoch < osdmap->get_epoch()) {
-        return seastar::make_ready_future<osd_id_t>(osd);
-      } else {
-        return seastar::make_ready_future<osd_id_t>(-1);
-      }
-    }, osds_t{},
-    [](osds_t&& extras, osd_id_t extra) {
-      if (extra >= 0) {
-        extras.push_back(extra);
-      }
-      return std::move(extras);
-    });
+  return osds;
 }
 
 void Heartbeat::add_reporter_peers(int whoami)
@@ -163,49 +151,37 @@ void Heartbeat::add_reporter_peers(int whoami)
   };
 }
 
-seastar::future<> Heartbeat::update_peers(int whoami)
+void Heartbeat::update_peers(int whoami)
 {
   const auto min_peers = static_cast<size_t>(
     local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
   add_reporter_peers(whoami);
-  return remove_down_peers().then([=](osds_t&& extra) {
-    // too many?
-    struct iteration_state {
-      osds_t::const_iterator where;
-      osds_t::const_iterator end;
-    };
-    return seastar::do_with(iteration_state{extra.begin(),extra.end()},
-      [=](iteration_state& s) {
-        return seastar::do_until(
-          [min_peers, &s, this] {
-            return peers.size() <= min_peers || s.where == s.end; },
-          [&s, this] {
-            return remove_peer(*s.where); }
-        );
-    });
-  }).then([=] {
-    // or too few?
-    auto osdmap = service.get_osdmap_service().get_map();
-    auto epoch = osdmap->get_epoch();
-    for (auto next = osdmap->get_next_up_osd_after(whoami);
-      peers.size() < min_peers && next >= 0 && next != whoami;
-      next = osdmap->get_next_up_osd_after(next)) {
-      add_peer(next, epoch);
+  auto extra = remove_down_peers();
+  // too many?
+  for (auto& osd : extra) {
+    if (peers.size() <= min_peers) {
+      break;
     }
-  });
+    remove_peer(osd);
+  }
+  // or too few?
+  auto osdmap = service.get_osdmap_service().get_map();
+  auto epoch = osdmap->get_epoch();
+  for (auto next = osdmap->get_next_up_osd_after(whoami);
+    peers.size() < min_peers && next >= 0 && next != whoami;
+    next = osdmap->get_next_up_osd_after(next)) {
+    add_peer(next, epoch);
+  }
 }
 
-seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
+void Heartbeat::remove_peer(osd_id_t peer)
 {
   auto found = peers.find(peer);
   assert(found != peers.end());
   logger().info("remove_peer({})", peer);
-  return seastar::when_all_succeed(found->second.con_front->close(),
-                                   found->second.con_back->close()).then(
-    [this, peer] {
-      peers.erase(peer);
-      return seastar::now();
-    });
+  found->second.con_front->mark_down();
+  found->second.con_back->mark_down();
+  peers.erase(peer);
 }
 
 seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
@@ -231,9 +207,9 @@ seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
   }
   const auto peer = found->first;
   const auto epoch = found->second.epoch;
-  return remove_peer(peer).then([peer, epoch, this] {
-    add_peer(peer, epoch);
-  });
+  remove_peer(peer);
+  add_peer(peer, epoch);
+  return seastar::now();
 }
 
 seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
index c51e81de67b073c6438ca8d7ff5d111e7475f5a2..a0e6146cd47bbc5cbe5c0dbfb9f79946cb6a1d85 100644 (file)
@@ -35,8 +35,8 @@ public:
   seastar::future<> stop();
 
   void add_peer(osd_id_t peer, epoch_t epoch);
-  seastar::future<> update_peers(int whoami);
-  seastar::future<> remove_peer(osd_id_t peer);
+  void update_peers(int whoami);
+  void remove_peer(osd_id_t peer);
 
   const entity_addrvec_t& get_front_addrs() const;
   const entity_addrvec_t& get_back_addrs() const;
@@ -62,7 +62,7 @@ private:
   using osds_t = std::vector<osd_id_t>;
   /// remove down OSDs
   /// @return peers not needed in this epoch
-  seastar::future<osds_t> remove_down_peers();
+  osds_t remove_down_peers();
   /// add enough reporters for fast failure detection
   void add_reporter_peers(int whoami);
 
index 9ce9010053587e33ee26cb4598c3ea0d9fbca635..01f938535774100ca66fd3c5de07515796c2d2c2 100644 (file)
@@ -78,7 +78,7 @@ OSD::OSD(int id, uint32_t nonce,
     shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
     heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
     // do this in background
-    heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
+    heartbeat_timer{[this] { update_heartbeat_peers(); }},
     asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
     osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
 {
@@ -1049,10 +1049,10 @@ seastar::future<> OSD::send_beacon()
   return monc->send_message(m);
 }
 
-seastar::future<> OSD::update_heartbeat_peers()
+void OSD::update_heartbeat_peers()
 {
   if (!state.is_active()) {
-    return seastar::now();
+    return;
   }
   for (auto& pg : pg_map.get_pgs()) {
     vector<int> up, acting;
@@ -1067,7 +1067,7 @@ seastar::future<> OSD::update_heartbeat_peers()
       }
     }
   }
-  return heartbeat->update_peers(whoami);
+  heartbeat->update_peers(whoami);
 }
 
 seastar::future<> OSD::handle_peering_op(
index baf90c8d37f158824d806c9f39ec3871375ead18..be090fb90e1cd708eab94578a4eb5302509f49f7 100644 (file)
@@ -216,7 +216,7 @@ public:
   seastar::future<> shutdown();
 
   seastar::future<> send_beacon();
-  seastar::future<> update_heartbeat_peers();
+  void update_heartbeat_peers();
 
   friend class PGAdvanceMap;
 };
index 1f30f947e336dbf34dee88c0b9fd38827b48892f..4a5b3f745d88ba47b5da854d7cc77fd485b455f1 100644 (file)
@@ -1122,7 +1122,8 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> markdown() {
     logger().info("[Test] markdown()");
     ceph_assert(tracked_conn);
-    return tracked_conn->close();
+    tracked_conn->mark_down();
+    return seastar::now();
   }
 
   seastar::future<> wait_blocked() {
@@ -1470,7 +1471,8 @@ class FailoverSuitePeer : public Dispatcher {
   seastar::future<> markdown() {
     logger().info("[TestPeer] markdown()");
     ceph_assert(tracked_conn);
-    return tracked_conn->close();
+    tracked_conn->mark_down();
+    return seastar::now();
   }
 
   static seastar::future<std::unique_ptr<FailoverSuitePeer>>