}
}
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c)
+seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c, bool is_replace)
{
if (conn == c) {
report_timer.cancel();
private:
seastar::future<> ms_dispatch(crimson::net::Connection* conn,
Ref<Message> m) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) final;
+ seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final;
seastar::future<> handle_mgr_map(crimson::net::Connection* conn,
Ref<MMgrMap> m);
}
}
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn)
+seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
seastar::future<> ms_dispatch(crimson::net::Connection* conn,
MessageRef m) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
seastar::future<> handle_monmap(crimson::net::Connection* conn,
Ref<MMonMap> m);
return seastar::make_ready_future<>();
}
- virtual seastar::future<> ms_handle_reset(ConnectionRef conn) {
+ // a reset event is dispatched when the connection is closed unexpectedly.
+ // is_replace=true means the reset connection is going to be replaced by
+ // another accepting connection with the same peer_addr, which currently only
+ // happens under lossy policy when both sides wish to connect to each other.
+ virtual seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) {
return seastar::make_ready_future<>();
}
return;
}
+ bool is_replace = f_accept_new ? true : false;
logger().info("{} closing: reset {}, replace {}", conn,
dispatch_reset ? "yes" : "no",
- f_accept_new ? "yes" : "no");
+ is_replace ? "yes" : "no");
// unregister_conn() drops a reference, so hold another until completion
auto cleanup = [conn_ref = conn.shared_from_this(), this] {
}
set_write_state(write_state_t::drop);
auto gate_closed = pending_dispatch.close();
- auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] {
+ auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset, is_replace] {
if (dispatch_reset) {
return dispatcher.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
}
return seastar::now();
}).handle_exception([this] (std::exception_ptr eptr) {
SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
- bool dispatch_reset = true;
if (existing_conn) {
if (existing_conn->protocol->proto_type != proto_t::v2) {
logger().warn("{} existing connection {} proto version is {}, close existing",
static_cast<int>(existing_conn->protocol->proto_type));
// should unregister the existing from msgr atomically
// NOTE: this is following async messenger logic, but we may miss the reset event.
- dispatch_reset = false;
+ execute_establishing(existing_conn, false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
} else {
return handle_existing_connection(existing_conn);
}
+ } else {
+ execute_establishing(nullptr, true);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
-
- execute_establishing(existing_conn, dispatch_reset);
- return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
}
}
seastar::future<>
-ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn) {
- return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
- return dispatcher->ms_handle_reset(conn);
+ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+ return seastar::do_for_each(dispatchers, [conn, is_replace](Dispatcher* dispatcher) {
+ return dispatcher->ms_handle_reset(conn, is_replace);
});
}
seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override;
seastar::future<> ms_handle_accept(crimson::net::ConnectionRef conn) override;
seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) override;
};
}
}
-seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
+seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
auto found = std::find_if(peers.begin(), peers.end(),
[conn](const peers_map_t::value_type& peer) {
// Dispatcher methods
seastar::future<> ms_dispatch(crimson::net::Connection* conn,
MessageRef m) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) override;
+ seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
private:
seastar::future<> handle_osd_ping(crimson::net::Connection* conn,
}
}
-seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn)
+seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
// TODO: cleanup the session attached to this connection
logger().warn("ms_handle_reset");
// Dispatcher methods
seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final;
seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn) final;
+ seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
// mgr::WithStats methods
return seastar::now();
}
- seastar::future<> ms_handle_reset(ConnectionRef conn) override {
+ seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override {
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked reset connection: {}", *conn);
return flush_pending_send();
}
- seastar::future<> ms_handle_reset(ConnectionRef conn) override {
+ seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override {
logger().info("[TestPeer] got reset from Test");
ceph_assert(tracked_conn == conn);
tracked_conn = nullptr;