// The connection is accepted or recoverred(lossless), all the followup
// events and messages will be dispatched to the new_shard.
- virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard) {}
+ //
+ // is_replace=true means the accepted connection has replaced
+ // another connecting connection with the same peer_addr, which currently only
+ // happens under lossy policy when both sides wish to connect to each other.
+ virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard, bool is_replace) {}
// The connection is (re)connected, all the followup events and messages will
// be dispatched to the new_shard.
virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {}
// a reset event is dispatched when the connection is closed unexpectedly.
+ //
// is_replace=true means the reset connection is going to be replaced by
// another accepting connection with the same peer_addr, which currently only
// happens under lossy policy when both sides wish to connect to each other.
ceph_assert_always(is_socket_valid);
trigger_state(state_t::ESTABLISHING, io_state_t::delay);
+ bool is_replace;
if (existing_conn) {
logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
"client_cookie={}, server_cookie={}, {}, new_sid={}, "
client_cookie, server_cookie,
io_states, frame_assembler->get_socket_shard_id(),
*existing_conn);
+ is_replace = true;
ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
existing_conn->protocol.get());
existing_proto->do_close(
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, io_states,
frame_assembler->get_socket_shard_id());
+ is_replace = false;
accept_me();
}
- gated_execute("execute_establishing", conn, [this] {
+ gated_execute("execute_establishing", conn, [this, is_replace] {
ceph_assert_always(state == state_t::ESTABLISHING);
// set io_handler to a new shard
pr_switch_io_shard = seastar::shared_promise<>();
return seastar::smp::submit_to(
io_handler.get_shard_id(),
- [this, cc_seq, new_io_shard,
+ [this, cc_seq, new_io_shard, is_replace,
conn_fref=std::move(conn_fref)]() mutable {
return io_handler.dispatch_accept(
- cc_seq, new_io_shard, std::move(conn_fref));
+ cc_seq, new_io_shard, std::move(conn_fref), is_replace);
}).then([this, new_io_shard] {
ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
pr_switch_io_shard->set_value();
[this, cc_seq, new_io_shard,
conn_fref=std::move(conn_fref)]() mutable {
return io_handler.dispatch_accept(
- cc_seq, new_io_shard, std::move(conn_fref));
+ cc_seq, new_io_shard, std::move(conn_fref), false);
}).then([this, new_io_shard] {
ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
pr_switch_io_shard->set_value();
void
ChainedDispatchers::ms_handle_accept(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard) {
+ seastar::shard_id new_shard,
+ bool is_replace) {
try {
for (auto& dispatcher : dispatchers) {
- dispatcher->ms_handle_accept(conn, new_shard);
+ dispatcher->ms_handle_accept(conn, new_shard, is_replace);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_accept() {}",
return dispatchers.empty();
}
seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
- void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id);
+ void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace);
void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id);
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
IOHandler::dispatch_accept(
crosscore_t::seq_t cc_seq,
seastar::shard_id new_sid,
- ConnectionFRef conn_fref)
+ ConnectionFRef conn_fref,
+ bool is_replace)
{
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
if (!crosscore.proceed_or_wait(cc_seq)) {
logger().debug("{} got {} dispatch_accept(), wait at {}",
conn, cc_seq, crosscore.get_in_seq());
return crosscore.wait(cc_seq
- ).then([this, cc_seq, new_sid,
+ ).then([this, cc_seq, new_sid, is_replace,
conn_fref=std::move(conn_fref)]() mutable {
- return dispatch_accept(cc_seq, new_sid, std::move(conn_fref));
+ return dispatch_accept(cc_seq, new_sid, std::move(conn_fref), is_replace);
});
}
- logger().debug("{} got {} dispatch_accept({}) at {}",
- conn, cc_seq, new_sid, io_stat_printer{*this});
+ logger().debug("{} got {} dispatch_accept(new_sid={}, replace={}) at {}",
+ conn, cc_seq, new_sid, is_replace, io_stat_printer{*this});
if (get_io_state() == io_state_t::drop) {
assert(!protocol_is_connected);
// it is possible that both io_handler and protocolv2 are
auto _conn_ref = conn_ref;
auto fut = to_new_sid(new_sid, std::move(conn_fref));
- dispatchers.ms_handle_accept(_conn_ref, new_sid);
+ dispatchers.ms_handle_accept(_conn_ref, new_sid, is_replace);
// user can make changes
return fut;
seastar::future<> dispatch_accept(
crosscore_t::seq_t cc_seq,
seastar::shard_id new_sid,
- ConnectionFRef);
+ ConnectionFRef,
+ bool is_replace);
seastar::future<> dispatch_connect(
crosscore_t::seq_t cc_seq,
void Heartbeat::ms_handle_accept(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard)
+ seastar::shard_id new_shard,
+ bool is_replace)
{
ceph_assert_always(seastar::this_shard_id() == new_shard);
auto peer = conn->get_peer_id();
}
if (auto found = peers.find(peer);
found != peers.end()) {
- found->second.handle_accept(conn);
+ found->second.handle_accept(conn, is_replace);
}
}
return (conn && conn == _conn);
}
-void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+bool Heartbeat::Connection::accepted(
+ crimson::net::ConnectionRef accepted_conn,
+ bool is_replace)
{
- if (!conn) {
- if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
- logger().info("Heartbeat::Connection::accepted(): "
- "{} racing resolved", *this);
- conn = accepted_conn;
- set_connected();
+ ceph_assert(accepted_conn);
+ ceph_assert(accepted_conn != conn);
+ if (accepted_conn->get_peer_addr() != listener.get_peer_addr(type)) {
+ return false;
+ }
+
+ if (is_replace) {
+ logger().info("Heartbeat::Connection::accepted(): "
+ "{} racing", *this);
+ racing_detected = true;
+ }
+ if (conn) {
+ // there is no assumption about the ordering of the reset and accept
+ // events for the 2 racing connections.
+ if (is_connected) {
+ logger().warn("Heartbeat::Connection::accepted(): "
+ "{} is accepted while connected, is_replace={}",
+ *this, is_replace);
+ conn->mark_down();
+ set_unconnected();
}
- } else if (conn == accepted_conn) {
- set_connected();
}
+ conn = accepted_conn;
+ set_connected();
+ return true;
}
-void Heartbeat::Connection::replaced()
+void Heartbeat::Connection::reset(bool is_replace)
{
- assert(!is_connected);
- auto replaced_conn = conn;
- // set the racing connection, will be handled by handle_accept()
- conn = msgr.connect(replaced_conn->get_peer_addr(),
- replaced_conn->get_peer_name());
- racing_detected = true;
- logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
- assert(conn != replaced_conn);
-}
+ if (is_replace) {
+ logger().info("Heartbeat::Connection::reset(): "
+ "{} racing, waiting for the replacing accept",
+ *this);
+ racing_detected = true;
+ }
-void Heartbeat::Connection::reset()
-{
- conn = nullptr;
if (is_connected) {
- is_connected = false;
- listener.decrease_connected();
+ set_unconnected();
+ } else {
+ conn = nullptr;
}
- if (!racing_detected || is_winner_side) {
+
+ if (is_replace) {
+ // waiting for the replacing accept event
+ } else if (!racing_detected || is_winner_side) {
connect();
- } else {
+ } else { // racing_detected && !is_winner_side
logger().info("Heartbeat::Connection::reset(): "
"{} racing detected and lose, "
"waiting for peer connect me", *this);
void Heartbeat::Connection::set_connected()
{
+ assert(conn);
assert(!is_connected);
+ ceph_assert(conn->is_connected());
is_connected = true;
listener.increase_connected();
}
+void Heartbeat::Connection::set_unconnected()
+{
+ assert(conn);
+ assert(is_connected);
+ conn = nullptr;
+ is_connected = false;
+ listener.decrease_connected();
+}
+
void Heartbeat::Connection::connect()
{
assert(!conn);
}
}
+void Heartbeat::Peer::handle_reset(
+ crimson::net::ConnectionRef conn, bool is_replace)
+{
+ int cnt = 0;
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.matches(conn)) {
+ ++cnt;
+ _conn.reset(is_replace);
+ }
+ });
+
+ if (cnt == 0) {
+ logger().info("Heartbeat::Peer::handle_reset(): {} ignores conn, is_replace={} -- {}",
+ *this, is_replace, *conn);
+ } else if (cnt > 1) {
+ logger().error("Heartbeat::Peer::handle_reset(): {} handles conn {} times -- {}",
+ *this, cnt, *conn);
+ }
+}
+
+void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
+{
+ int cnt = 0;
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.matches(conn)) {
+ ++cnt;
+ _conn.connected();
+ }
+ });
+
+ if (cnt == 0) {
+ logger().error("Heartbeat::Peer::handle_connect(): {} ignores conn -- {}",
+ *this, *conn);
+ conn->mark_down();
+ } else if (cnt > 1) {
+ logger().error("Heartbeat::Peer::handle_connect(): {} handles conn {} times -- {}",
+ *this, cnt, *conn);
+ }
+}
+
+void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn, bool is_replace)
+{
+ int cnt = 0;
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.accepted(conn, is_replace)) {
+ ++cnt;
+ }
+ });
+
+ if (cnt == 0) {
+ logger().warn("Heartbeat::Peer::handle_accept(): {} ignores conn -- {}",
+ *this, *conn);
+ } else if (cnt > 1) {
+ logger().error("Heartbeat::Peer::handle_accept(): {} handles conn {} times -- {}",
+ *this, cnt, *conn);
+ }
+}
+
seastar::future<> Heartbeat::Peer::handle_reply(
crimson::net::ConnectionRef conn, Ref<MOSDPing> m)
{
crimson::net::ConnectionRef conn, MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override;
- void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id) override;
+ void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace) override;
void print(std::ostream&) const;
private:
void connected() {
set_connected();
}
- void accepted(crimson::net::ConnectionRef);
- void replaced();
- void reset();
+ bool accepted(crimson::net::ConnectionRef, bool is_replace);
+ void reset(bool is_replace=false);
seastar::future<> send(MessageURef msg);
void validate();
// retry connection if still pending
private:
void set_connected();
+ void set_unconnected();
void connect();
const osd_id_t peer;
crimson::net::ConnectionRef conn;
bool is_connected = false;
- friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
- if (c.type == type_t::front) {
- return os << "con_front(osd." << c.peer << ")";
- } else {
- return os << "con_back(osd." << c.peer << ")";
- }
- }
+ friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
+ if (c.type == type_t::front) {
+ return os << "con_front(osd." << c.peer << ")";
+ } else {
+ return os << "con_back(osd." << c.peer << ")";
+ }
+ }
};
-#if FMT_VERSION >= 90000
-template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {};
-#endif
-
/*
* Track the ping history and ping reply (the pong) from the same session, clean up
* history once hb_front or hb_back loses connection and restart the session once
void send_heartbeat(
clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>);
- void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
- for_each_conn([&] (auto& _conn) {
- if (_conn.matches(conn)) {
- if (is_replace) {
- _conn.replaced();
- } else {
- _conn.reset();
- }
- }
- });
- }
- void handle_connect(crimson::net::ConnectionRef conn) {
- for_each_conn([&] (auto& _conn) {
- if (_conn.matches(conn)) {
- _conn.connected();
- }
- });
- }
- void handle_accept(crimson::net::ConnectionRef conn) {
- for_each_conn([&] (auto& _conn) {
- _conn.accepted(conn);
- });
- }
+
+ void handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
+
+ void handle_connect(crimson::net::ConnectionRef conn);
+
+ void handle_accept(crimson::net::ConnectionRef conn, bool is_replace);
private:
entity_addr_t get_peer_addr(type_t type) override;
bool pending_send = false;
Connection con_front;
Connection con_back;
+
+ friend std::ostream& operator<<(std::ostream& os, const Peer& p) {
+ return os << "peer(osd." << p.peer << ")";
+ }
};
#if FMT_VERSION >= 90000
template <> struct fmt::formatter<Heartbeat> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<Heartbeat::Connection> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<Heartbeat::Peer> : fmt::ostream_formatter {};
#endif
void ms_handle_accept(
ConnectionRef conn,
- seastar::shard_id new_shard) override {
+ seastar::shard_id new_shard,
+ bool is_replace) override {
assert(new_shard == seastar::this_shard_id());
auto result = interceptor.find_result(conn);
if (result == nullptr) {
void ms_handle_accept(
ConnectionRef conn,
- seastar::shard_id new_shard) override {
+ seastar::shard_id new_shard,
+ bool is_replace) override {
assert(new_shard == seastar::this_shard_id());
logger().info("[TestPeer] got accept from Test");
ceph_assert(!tracked_conn ||
void ms_handle_accept(
ConnectionRef conn,
- seastar::shard_id new_shard) override {
+ seastar::shard_id new_shard,
+ bool is_replace) override {
assert(new_shard == seastar::this_shard_id());
cmd_conn = conn;
}
}
std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef con,
- MessageRef m) {
+ MessageRef m) final {
if (verbose) {
logger().warn("{}: con = {}", __func__, *con);
}
void ms_handle_accept(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard) {
+ seastar::shard_id new_shard,
+ bool is_replace) final {
logger().info("{} - Connection:{}", __func__, *conn);
assert(new_shard == seastar::this_shard_id());
}
void ms_handle_connect(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard) {
+ seastar::shard_id new_shard) final {
logger().info("{} - Connection:{}", __func__, *conn);
assert(new_shard == seastar::this_shard_id());
}
- void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace);
+ void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace) final;
- void ms_handle_remote_reset(crimson::net::ConnectionRef con) {
+ void ms_handle_remote_reset(crimson::net::ConnectionRef con) final {
clear_pending(con);
}