From: Yingxin Cheng Date: Fri, 13 Mar 2020 06:22:40 +0000 (+0800) Subject: crimson/net: change close() to mark_down() X-Git-Tag: v16.0.0~8^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=434cdd46661e39543227fa25657d069aa4170233;p=ceph.git crimson/net: change close() to mark_down() * be explicit that mark_down() won't trigger reset event; * return void so no deadlock is possible and memory is still safe guarded by Messenger::shutdown(); * related changes in crimson/osd; Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index bd40d16e709bd..9134c2602132b 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -40,9 +40,7 @@ seastar::future<> Client::stop() { return gate.close().then([this] { if (conn) { - return conn->close(); - } else { - return seastar::now(); + conn->mark_down(); } }); } @@ -85,8 +83,7 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c) seastar::future<> Client::reconnect() { if (conn) { - // crimson::net::Protocol::close() is able to close() in background - (void)conn->close(); + conn->mark_down(); conn = {}; } if (!mgrmap.get_available()) { diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 03d57bb71aab6..0018037013706 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -79,7 +79,7 @@ public: const std::vector& allowed_modes); // v1 and v2 - seastar::future<> close(); + void close(); bool is_my_peer(const entity_addr_t& addr) const; AuthAuthorizer* get_authorizer(entity_type_t peer) const; KeyStore& get_keys(); @@ -427,16 +427,14 @@ int Connection::handle_auth_bad_method(uint32_t old_auth_method, return 0; } -seastar::future<> Connection::close() +void Connection::close() { reply.set_value(Ref(nullptr)); reply = {}; auth_done.set_value(AuthResult::canceled); auth_done = {}; if (conn && !std::exchange(closed, true)) { - return conn->close(); - } else { - return seastar::now(); + conn->mark_down(); } } @@ -551,7 +549,8 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn) }); if (found != pending_conns.end()) { logger().warn("pending conn reset by {}", conn->get_peer_addr()); - return (*found)->close(); + (*found)->close(); + return seastar::now(); } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { logger().warn("active conn reset {}", conn->get_peer_addr()); active_con.reset(); @@ -920,9 +919,7 @@ seastar::future<> Client::stop() return tick_gate.close().then([this] { timer.cancel(); if (active_con) { - return active_con->close(); - } else { - return seastar::now(); + active_con->close(); } }); } @@ -953,9 +950,8 @@ seastar::future<> Client::reopen_session(int rank) } else { return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys) .handle_exception([conn](auto ep) { - return conn->close().then([ep=std::move(ep)](){ - return seastar::make_exception_future(ep); - }); + conn->mark_down(); + return seastar::make_exception_future(ep); }); } }).then([peer, this](auto result) { @@ -986,21 +982,13 @@ seastar::future<> Client::reopen_session(int rank) ceph_assert(!active_con && !pending_conns.empty()); active_con = std::move(*found); found->reset(); - auto ret = seastar::do_with( - std::move(pending_conns), - [](auto &pending_conns) { - return seastar::parallel_for_each( - pending_conns, - [] (auto &conn) { - if (!conn) { - return seastar::now(); - } else { - return conn->close(); - } - }); - }); + for (auto& conn : pending_conns) { + if (conn) { + conn->close(); + } + } pending_conns.clear(); - return ret; + return seastar::now(); }).then([]() { logger().debug("reopen_session mon connection attempts complete"); }).handle_exception([](auto ep) { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 3beb8f42bb569..04c57cb120fc4 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -107,10 +107,9 @@ class Connection : public seastar::enable_shared_from_this { /// handshake virtual seastar::future<> keepalive() = 0; - // close the connection and cancel any any pending futures from read/send - // Note it's OK to discard the returned future because Messenger::shutdown() - // will wait for all connections closed - virtual seastar::future<> close() = 0; + // close the connection and cancel any any pending futures from read/send, + // without dispatching any reset event + virtual void mark_down() = 0; virtual void print(ostream& out) const = 0; diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 26344b0fb4fc9..47a79f0575c29 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -584,7 +584,7 @@ seastar::future ProtocolV1::repeat_handle_connect() conn, *existing, static_cast(existing->protocol->proto_type)); // NOTE: this is following async messenger logic, but we may miss the reset event. - (void) existing->close(); + existing->mark_down(); } else { return handle_connect_with_existing(existing, std::move(authorizer_reply)); } diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index cc752f43a1c3a..76e525b510ed2 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1390,7 +1390,7 @@ ProtocolV2::server_reconnect() conn, *existing_conn, static_cast(existing_conn->protocol->proto_type)); // NOTE: this is following async messenger logic, but we may miss the reset event. - (void) existing_conn->close(); + existing_conn->mark_down(); return send_reset(true); } diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 5bf2c30c4a861..49b6702929285 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -87,10 +87,10 @@ seastar::future<> SocketConnection::keepalive() return protocol->keepalive(); } -seastar::future<> SocketConnection::close() +void SocketConnection::mark_down() { assert(seastar::engine().cpu_id() == shard_id()); - return protocol->close_clean(false); + protocol->close(false); } bool SocketConnection::update_rx_seq(seq_num_t seq) @@ -126,6 +126,12 @@ SocketConnection::start_accept(SocketRef&& sock, protocol->start_accept(std::move(sock), _peer_addr); } +seastar::future<> +SocketConnection::close_clean(bool dispatch_reset) +{ + return protocol->close_clean(dispatch_reset); +} + seastar::shard_id SocketConnection::shard_id() const { return messenger.shard_id(); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index de814c9418b41..a5b63473a3336 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -90,7 +90,7 @@ class SocketConnection : public Connection { seastar::future<> keepalive() override; - seastar::future<> close() override; + void mark_down() override; void print(ostream& out) const override; @@ -103,6 +103,8 @@ class SocketConnection : public Connection { void start_accept(SocketRef&& socket, const entity_addr_t& peer_addr); + seastar::future<> close_clean(bool dispatch_reset); + bool is_server_side() const { return policy.server; } diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 423e7d4edc987..2817602cac7d2 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -163,12 +163,12 @@ seastar::future<> SocketMessenger::shutdown() // close all connections }).then([this] { return seastar::parallel_for_each(accepting_conns, [] (auto conn) { - return conn->close(); + return conn->close_clean(false); }); }).then([this] { ceph_assert(accepting_conns.empty()); return seastar::parallel_for_each(connections, [] (auto conn) { - return conn.second->close(); + return conn.second->close_clean(false); }); }).then([this] { ceph_assert(connections.empty()); diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 4318d29f88033..ab08dfa8efbaa 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -113,31 +113,19 @@ void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) } } -seastar::future Heartbeat::remove_down_peers() +Heartbeat::osds_t Heartbeat::remove_down_peers() { osds_t osds; for (auto& peer : peers) { - osds.push_back(peer.first); + auto osd = peer.first; + auto osdmap = service.get_osdmap_service().get_map(); + if (!osdmap->is_up(osd)) { + remove_peer(osd); + } else if (peers[osd].epoch < osdmap->get_epoch()) { + osds.push_back(osd); + } } - return seastar::map_reduce(std::move(osds), - [this](auto& osd) { - auto osdmap = service.get_osdmap_service().get_map(); - if (!osdmap->is_up(osd)) { - return remove_peer(osd).then([] { - return seastar::make_ready_future(-1); - }); - } else if (peers[osd].epoch < osdmap->get_epoch()) { - return seastar::make_ready_future(osd); - } else { - return seastar::make_ready_future(-1); - } - }, osds_t{}, - [](osds_t&& extras, osd_id_t extra) { - if (extra >= 0) { - extras.push_back(extra); - } - return std::move(extras); - }); + return osds; } void Heartbeat::add_reporter_peers(int whoami) @@ -163,49 +151,37 @@ void Heartbeat::add_reporter_peers(int whoami) }; } -seastar::future<> Heartbeat::update_peers(int whoami) +void Heartbeat::update_peers(int whoami) { const auto min_peers = static_cast( local_conf().get_val("osd_heartbeat_min_peers")); add_reporter_peers(whoami); - return remove_down_peers().then([=](osds_t&& extra) { - // too many? - struct iteration_state { - osds_t::const_iterator where; - osds_t::const_iterator end; - }; - return seastar::do_with(iteration_state{extra.begin(),extra.end()}, - [=](iteration_state& s) { - return seastar::do_until( - [min_peers, &s, this] { - return peers.size() <= min_peers || s.where == s.end; }, - [&s, this] { - return remove_peer(*s.where); } - ); - }); - }).then([=] { - // or too few? - auto osdmap = service.get_osdmap_service().get_map(); - auto epoch = osdmap->get_epoch(); - for (auto next = osdmap->get_next_up_osd_after(whoami); - peers.size() < min_peers && next >= 0 && next != whoami; - next = osdmap->get_next_up_osd_after(next)) { - add_peer(next, epoch); + auto extra = remove_down_peers(); + // too many? + for (auto& osd : extra) { + if (peers.size() <= min_peers) { + break; } - }); + remove_peer(osd); + } + // or too few? + auto osdmap = service.get_osdmap_service().get_map(); + auto epoch = osdmap->get_epoch(); + for (auto next = osdmap->get_next_up_osd_after(whoami); + peers.size() < min_peers && next >= 0 && next != whoami; + next = osdmap->get_next_up_osd_after(next)) { + add_peer(next, epoch); + } } -seastar::future<> Heartbeat::remove_peer(osd_id_t peer) +void Heartbeat::remove_peer(osd_id_t peer) { auto found = peers.find(peer); assert(found != peers.end()); logger().info("remove_peer({})", peer); - return seastar::when_all_succeed(found->second.con_front->close(), - found->second.con_back->close()).then( - [this, peer] { - peers.erase(peer); - return seastar::now(); - }); + found->second.con_front->mark_down(); + found->second.con_back->mark_down(); + peers.erase(peer); } seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn, @@ -231,9 +207,9 @@ seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn) } const auto peer = found->first; const auto epoch = found->second.epoch; - return remove_peer(peer).then([peer, epoch, this] { - add_peer(peer, epoch); - }); + remove_peer(peer); + add_peer(peer, epoch); + return seastar::now(); } seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn, diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index c51e81de67b07..a0e6146cd47bb 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -35,8 +35,8 @@ public: seastar::future<> stop(); void add_peer(osd_id_t peer, epoch_t epoch); - seastar::future<> update_peers(int whoami); - seastar::future<> remove_peer(osd_id_t peer); + void update_peers(int whoami); + void remove_peer(osd_id_t peer); const entity_addrvec_t& get_front_addrs() const; const entity_addrvec_t& get_back_addrs() const; @@ -62,7 +62,7 @@ private: using osds_t = std::vector; /// remove down OSDs /// @return peers not needed in this epoch - seastar::future remove_down_peers(); + osds_t remove_down_peers(); /// add enough reporters for fast failure detection void add_reporter_peers(int whoami); diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 9ce9010053587..01f9385357741 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -78,7 +78,7 @@ OSD::OSD(int id, uint32_t nonce, shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store}, heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}}, // do this in background - heartbeat_timer{[this] { (void)update_heartbeat_peers(); }}, + heartbeat_timer{[this] { update_heartbeat_peers(); }}, asok{seastar::make_lw_shared()}, osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))) { @@ -1049,10 +1049,10 @@ seastar::future<> OSD::send_beacon() return monc->send_message(m); } -seastar::future<> OSD::update_heartbeat_peers() +void OSD::update_heartbeat_peers() { if (!state.is_active()) { - return seastar::now(); + return; } for (auto& pg : pg_map.get_pgs()) { vector up, acting; @@ -1067,7 +1067,7 @@ seastar::future<> OSD::update_heartbeat_peers() } } } - return heartbeat->update_peers(whoami); + heartbeat->update_peers(whoami); } seastar::future<> OSD::handle_peering_op( diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index baf90c8d37f15..be090fb90e1cd 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -216,7 +216,7 @@ public: seastar::future<> shutdown(); seastar::future<> send_beacon(); - seastar::future<> update_heartbeat_peers(); + void update_heartbeat_peers(); friend class PGAdvanceMap; }; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 1f30f947e336d..4a5b3f745d88b 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -1122,7 +1122,8 @@ class FailoverSuite : public Dispatcher { seastar::future<> markdown() { logger().info("[Test] markdown()"); ceph_assert(tracked_conn); - return tracked_conn->close(); + tracked_conn->mark_down(); + return seastar::now(); } seastar::future<> wait_blocked() { @@ -1470,7 +1471,8 @@ class FailoverSuitePeer : public Dispatcher { seastar::future<> markdown() { logger().info("[TestPeer] markdown()"); ceph_assert(tracked_conn); - return tracked_conn->close(); + tracked_conn->mark_down(); + return seastar::now(); } static seastar::future>