From a8709464232fc5cc28032716f67c754971420e6b Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 29 Jun 2023 13:25:28 +0800 Subject: [PATCH] crimson/osd/heartbeat: relax the order of replacement reset and accept With the new implementation in messenger, the order of replacement reset and accept events cannot be determined because they are from different connections. Modify the heatbeat logic to tolerate the both cases. Signed-off-by: Yingxin Cheng --- src/crimson/net/Dispatcher.h | 7 +- src/crimson/net/ProtocolV2.cc | 11 +- src/crimson/net/chained_dispatchers.cc | 5 +- src/crimson/net/chained_dispatchers.h | 2 +- src/crimson/net/io_handler.cc | 13 +- src/crimson/net/io_handler.h | 3 +- src/crimson/osd/heartbeat.cc | 141 +++++++++++++++++----- src/crimson/osd/heartbeat.h | 61 ++++------ src/test/crimson/test_messenger.cc | 9 +- src/test/crimson/test_messenger_thrash.cc | 11 +- 10 files changed, 174 insertions(+), 89 deletions(-) diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index c563b5e266f2e..11908349e7cd1 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -32,13 +32,18 @@ class Dispatcher { // The connection is accepted or recoverred(lossless), all the followup // events and messages will be dispatched to the new_shard. - virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard) {} + // + // is_replace=true means the accepted connection has replaced + // another connecting connection with the same peer_addr, which currently only + // happens under lossy policy when both sides wish to connect to each other. + virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard, bool is_replace) {} // The connection is (re)connected, all the followup events and messages will // be dispatched to the new_shard. virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {} // a reset event is dispatched when the connection is closed unexpectedly. + // // is_replace=true means the reset connection is going to be replaced by // another accepting connection with the same peer_addr, which currently only // happens under lossy policy when both sides wish to connect to each other. diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index f6a235d4af3fe..869c11cf5a553 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1760,6 +1760,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { ceph_assert_always(is_socket_valid); trigger_state(state_t::ESTABLISHING, io_state_t::delay); + bool is_replace; if (existing_conn) { logger().info("{} start establishing: gs={}, pgs={}, cs={}, " "client_cookie={}, server_cookie={}, {}, new_sid={}, " @@ -1768,6 +1769,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { client_cookie, server_cookie, io_states, frame_assembler->get_socket_shard_id(), *existing_conn); + is_replace = true; ProtocolV2 *existing_proto = dynamic_cast( existing_conn->protocol.get()); existing_proto->do_close( @@ -1786,10 +1788,11 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, io_states, frame_assembler->get_socket_shard_id()); + is_replace = false; accept_me(); } - gated_execute("execute_establishing", conn, [this] { + gated_execute("execute_establishing", conn, [this, is_replace] { ceph_assert_always(state == state_t::ESTABLISHING); // set io_handler to a new shard @@ -1803,10 +1806,10 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { pr_switch_io_shard = seastar::shared_promise<>(); return seastar::smp::submit_to( io_handler.get_shard_id(), - [this, cc_seq, new_io_shard, + [this, cc_seq, new_io_shard, is_replace, conn_fref=std::move(conn_fref)]() mutable { return io_handler.dispatch_accept( - cc_seq, new_io_shard, std::move(conn_fref)); + cc_seq, new_io_shard, std::move(conn_fref), is_replace); }).then([this, new_io_shard] { ceph_assert_always(io_handler.get_shard_id() == new_io_shard); pr_switch_io_shard->set_value(); @@ -1976,7 +1979,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, [this, cc_seq, new_io_shard, conn_fref=std::move(conn_fref)]() mutable { return io_handler.dispatch_accept( - cc_seq, new_io_shard, std::move(conn_fref)); + cc_seq, new_io_shard, std::move(conn_fref), false); }).then([this, new_io_shard] { ceph_assert_always(io_handler.get_shard_id() == new_io_shard); pr_switch_io_shard->set_value(); diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc index 2656c0e57492b..dfff6d916fa6d 100644 --- a/src/crimson/net/chained_dispatchers.cc +++ b/src/crimson/net/chained_dispatchers.cc @@ -41,10 +41,11 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn, void ChainedDispatchers::ms_handle_accept( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) { + seastar::shard_id new_shard, + bool is_replace) { try { for (auto& dispatcher : dispatchers) { - dispatcher->ms_handle_accept(conn, new_shard); + dispatcher->ms_handle_accept(conn, new_shard, is_replace); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_accept() {}", diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h index 40356e9d47367..5835205119d8d 100644 --- a/src/crimson/net/chained_dispatchers.h +++ b/src/crimson/net/chained_dispatchers.h @@ -26,7 +26,7 @@ public: return dispatchers.empty(); } seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef); - void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id); + void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace); void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id); void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace); void ms_handle_remote_reset(crimson::net::ConnectionRef conn); diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index b138ed0f26e0e..abb7f5e467346 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -558,21 +558,22 @@ seastar::future<> IOHandler::dispatch_accept( crosscore_t::seq_t cc_seq, seastar::shard_id new_sid, - ConnectionFRef conn_fref) + ConnectionFRef conn_fref, + bool is_replace) { ceph_assert_always(seastar::this_shard_id() == get_shard_id()); if (!crosscore.proceed_or_wait(cc_seq)) { logger().debug("{} got {} dispatch_accept(), wait at {}", conn, cc_seq, crosscore.get_in_seq()); return crosscore.wait(cc_seq - ).then([this, cc_seq, new_sid, + ).then([this, cc_seq, new_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable { - return dispatch_accept(cc_seq, new_sid, std::move(conn_fref)); + return dispatch_accept(cc_seq, new_sid, std::move(conn_fref), is_replace); }); } - logger().debug("{} got {} dispatch_accept({}) at {}", - conn, cc_seq, new_sid, io_stat_printer{*this}); + logger().debug("{} got {} dispatch_accept(new_sid={}, replace={}) at {}", + conn, cc_seq, new_sid, is_replace, io_stat_printer{*this}); if (get_io_state() == io_state_t::drop) { assert(!protocol_is_connected); // it is possible that both io_handler and protocolv2 are @@ -586,7 +587,7 @@ IOHandler::dispatch_accept( auto _conn_ref = conn_ref; auto fut = to_new_sid(new_sid, std::move(conn_fref)); - dispatchers.ms_handle_accept(_conn_ref, new_sid); + dispatchers.ms_handle_accept(_conn_ref, new_sid, is_replace); // user can make changes return fut; diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index 0175d2c522c98..edb69b3407afa 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -279,7 +279,8 @@ public: seastar::future<> dispatch_accept( crosscore_t::seq_t cc_seq, seastar::shard_id new_sid, - ConnectionFRef); + ConnectionFRef, + bool is_replace); seastar::future<> dispatch_connect( crosscore_t::seq_t cc_seq, diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 30de528291ab7..cdee52731e644 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -254,7 +254,8 @@ void Heartbeat::ms_handle_connect( void Heartbeat::ms_handle_accept( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) + seastar::shard_id new_shard, + bool is_replace) { ceph_assert_always(seastar::this_shard_id() == new_shard); auto peer = conn->get_peer_id(); @@ -264,7 +265,7 @@ void Heartbeat::ms_handle_accept( } if (auto found = peers.find(peer); found != peers.end()) { - found->second.handle_accept(conn); + found->second.handle_accept(conn, is_replace); } } @@ -433,42 +434,57 @@ bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn) const return (conn && conn == _conn); } -void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn) +bool Heartbeat::Connection::accepted( + crimson::net::ConnectionRef accepted_conn, + bool is_replace) { - if (!conn) { - if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) { - logger().info("Heartbeat::Connection::accepted(): " - "{} racing resolved", *this); - conn = accepted_conn; - set_connected(); + ceph_assert(accepted_conn); + ceph_assert(accepted_conn != conn); + if (accepted_conn->get_peer_addr() != listener.get_peer_addr(type)) { + return false; + } + + if (is_replace) { + logger().info("Heartbeat::Connection::accepted(): " + "{} racing", *this); + racing_detected = true; + } + if (conn) { + // there is no assumption about the ordering of the reset and accept + // events for the 2 racing connections. + if (is_connected) { + logger().warn("Heartbeat::Connection::accepted(): " + "{} is accepted while connected, is_replace={}", + *this, is_replace); + conn->mark_down(); + set_unconnected(); } - } else if (conn == accepted_conn) { - set_connected(); } + conn = accepted_conn; + set_connected(); + return true; } -void Heartbeat::Connection::replaced() +void Heartbeat::Connection::reset(bool is_replace) { - assert(!is_connected); - auto replaced_conn = conn; - // set the racing connection, will be handled by handle_accept() - conn = msgr.connect(replaced_conn->get_peer_addr(), - replaced_conn->get_peer_name()); - racing_detected = true; - logger().warn("Heartbeat::Connection::replaced(): {} racing", *this); - assert(conn != replaced_conn); -} + if (is_replace) { + logger().info("Heartbeat::Connection::reset(): " + "{} racing, waiting for the replacing accept", + *this); + racing_detected = true; + } -void Heartbeat::Connection::reset() -{ - conn = nullptr; if (is_connected) { - is_connected = false; - listener.decrease_connected(); + set_unconnected(); + } else { + conn = nullptr; } - if (!racing_detected || is_winner_side) { + + if (is_replace) { + // waiting for the replacing accept event + } else if (!racing_detected || is_winner_side) { connect(); - } else { + } else { // racing_detected && !is_winner_side logger().info("Heartbeat::Connection::reset(): " "{} racing detected and lose, " "waiting for peer connect me", *this); @@ -510,11 +526,22 @@ void Heartbeat::Connection::retry() void Heartbeat::Connection::set_connected() { + assert(conn); assert(!is_connected); + ceph_assert(conn->is_connected()); is_connected = true; listener.increase_connected(); } +void Heartbeat::Connection::set_unconnected() +{ + assert(conn); + assert(is_connected); + conn = nullptr; + is_connected = false; + listener.decrease_connected(); +} + void Heartbeat::Connection::connect() { assert(!conn); @@ -604,6 +631,64 @@ void Heartbeat::Peer::send_heartbeat( } } +void Heartbeat::Peer::handle_reset( + crimson::net::ConnectionRef conn, bool is_replace) +{ + int cnt = 0; + for_each_conn([&] (auto& _conn) { + if (_conn.matches(conn)) { + ++cnt; + _conn.reset(is_replace); + } + }); + + if (cnt == 0) { + logger().info("Heartbeat::Peer::handle_reset(): {} ignores conn, is_replace={} -- {}", + *this, is_replace, *conn); + } else if (cnt > 1) { + logger().error("Heartbeat::Peer::handle_reset(): {} handles conn {} times -- {}", + *this, cnt, *conn); + } +} + +void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn) +{ + int cnt = 0; + for_each_conn([&] (auto& _conn) { + if (_conn.matches(conn)) { + ++cnt; + _conn.connected(); + } + }); + + if (cnt == 0) { + logger().error("Heartbeat::Peer::handle_connect(): {} ignores conn -- {}", + *this, *conn); + conn->mark_down(); + } else if (cnt > 1) { + logger().error("Heartbeat::Peer::handle_connect(): {} handles conn {} times -- {}", + *this, cnt, *conn); + } +} + +void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn, bool is_replace) +{ + int cnt = 0; + for_each_conn([&] (auto& _conn) { + if (_conn.accepted(conn, is_replace)) { + ++cnt; + } + }); + + if (cnt == 0) { + logger().warn("Heartbeat::Peer::handle_accept(): {} ignores conn -- {}", + *this, *conn); + } else if (cnt > 1) { + logger().error("Heartbeat::Peer::handle_accept(): {} handles conn {} times -- {}", + *this, cnt, *conn); + } +} + seastar::future<> Heartbeat::Peer::handle_reply( crimson::net::ConnectionRef conn, Ref m) { diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index c5bf8f0ded232..f5da451181e91 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -53,7 +53,7 @@ public: crimson::net::ConnectionRef conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override; - void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id) override; + void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace) override; void print(std::ostream&) const; private: @@ -189,9 +189,8 @@ class Heartbeat::Connection { void connected() { set_connected(); } - void accepted(crimson::net::ConnectionRef); - void replaced(); - void reset(); + bool accepted(crimson::net::ConnectionRef, bool is_replace); + void reset(bool is_replace=false); seastar::future<> send(MessageURef msg); void validate(); // retry connection if still pending @@ -199,6 +198,7 @@ class Heartbeat::Connection { private: void set_connected(); + void set_unconnected(); void connect(); const osd_id_t peer; @@ -239,19 +239,15 @@ class Heartbeat::Connection { crimson::net::ConnectionRef conn; bool is_connected = false; - friend std::ostream& operator<<(std::ostream& os, const Connection& c) { - if (c.type == type_t::front) { - return os << "con_front(osd." << c.peer << ")"; - } else { - return os << "con_back(osd." << c.peer << ")"; - } - } + friend std::ostream& operator<<(std::ostream& os, const Connection& c) { + if (c.type == type_t::front) { + return os << "con_front(osd." << c.peer << ")"; + } else { + return os << "con_back(osd." << c.peer << ")"; + } + } }; -#if FMT_VERSION >= 90000 -template <> struct fmt::formatter : fmt::ostream_formatter {}; -#endif - /* * Track the ping history and ping reply (the pong) from the same session, clean up * history once hb_front or hb_back loses connection and restart the session once @@ -425,29 +421,12 @@ class Heartbeat::Peer final : private Heartbeat::ConnectionListener { void send_heartbeat( clock::time_point, ceph::signedspan, std::vector>&); seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref); - void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { - for_each_conn([&] (auto& _conn) { - if (_conn.matches(conn)) { - if (is_replace) { - _conn.replaced(); - } else { - _conn.reset(); - } - } - }); - } - void handle_connect(crimson::net::ConnectionRef conn) { - for_each_conn([&] (auto& _conn) { - if (_conn.matches(conn)) { - _conn.connected(); - } - }); - } - void handle_accept(crimson::net::ConnectionRef conn) { - for_each_conn([&] (auto& _conn) { - _conn.accepted(conn); - }); - } + + void handle_reset(crimson::net::ConnectionRef conn, bool is_replace); + + void handle_connect(crimson::net::ConnectionRef conn); + + void handle_accept(crimson::net::ConnectionRef conn, bool is_replace); private: entity_addr_t get_peer_addr(type_t type) override; @@ -469,8 +448,14 @@ class Heartbeat::Peer final : private Heartbeat::ConnectionListener { bool pending_send = false; Connection con_front; Connection con_back; + + friend std::ostream& operator<<(std::ostream& os, const Peer& p) { + return os << "peer(osd." << p.peer << ")"; + } }; #if FMT_VERSION >= 90000 template <> struct fmt::formatter : fmt::ostream_formatter {}; +template <> struct fmt::formatter : fmt::ostream_formatter {}; +template <> struct fmt::formatter : fmt::ostream_formatter {}; #endif diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 60b5439b228a8..bba62cc7974d4 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -862,7 +862,8 @@ class FailoverSuite : public Dispatcher { void ms_handle_accept( ConnectionRef conn, - seastar::shard_id new_shard) override { + seastar::shard_id new_shard, + bool is_replace) override { assert(new_shard == seastar::this_shard_id()); auto result = interceptor.find_result(conn); if (result == nullptr) { @@ -1457,7 +1458,8 @@ class FailoverSuitePeer : public Dispatcher { void ms_handle_accept( ConnectionRef conn, - seastar::shard_id new_shard) override { + seastar::shard_id new_shard, + bool is_replace) override { assert(new_shard == seastar::this_shard_id()); logger().info("[TestPeer] got accept from Test"); ceph_assert(!tracked_conn || @@ -1616,7 +1618,8 @@ class FailoverTestPeer : public Dispatcher { void ms_handle_accept( ConnectionRef conn, - seastar::shard_id new_shard) override { + seastar::shard_id new_shard, + bool is_replace) override { assert(new_shard == seastar::this_shard_id()); cmd_conn = conn; } diff --git a/src/test/crimson/test_messenger_thrash.cc b/src/test/crimson/test_messenger_thrash.cc index 3b41fe16e7ded..7c26d6ffdec67 100644 --- a/src/test/crimson/test_messenger_thrash.cc +++ b/src/test/crimson/test_messenger_thrash.cc @@ -105,7 +105,7 @@ class SyntheticDispatcher final } std::optional> ms_dispatch(crimson::net::ConnectionRef con, - MessageRef m) { + MessageRef m) final { if (verbose) { logger().warn("{}: con = {}", __func__, *con); } @@ -136,21 +136,22 @@ class SyntheticDispatcher final void ms_handle_accept( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) { + seastar::shard_id new_shard, + bool is_replace) final { logger().info("{} - Connection:{}", __func__, *conn); assert(new_shard == seastar::this_shard_id()); } void ms_handle_connect( crimson::net::ConnectionRef conn, - seastar::shard_id new_shard) { + seastar::shard_id new_shard) final { logger().info("{} - Connection:{}", __func__, *conn); assert(new_shard == seastar::this_shard_id()); } - void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace); + void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace) final; - void ms_handle_remote_reset(crimson::net::ConnectionRef con) { + void ms_handle_remote_reset(crimson::net::ConnectionRef con) final { clear_pending(con); } -- 2.39.5