return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
-void Client::ms_handle_connect(crimson::net::ConnectionRef c)
+void Client::ms_handle_connect(
+ crimson::net::ConnectionRef c,
+ seastar::shard_id new_shard)
{
+ ceph_assert_always(new_shard == seastar::this_shard_id());
gate.dispatch_in_background(__func__, *this, [this, c] {
if (conn == c) {
// ask for the mgrconfigure message
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef conn, Ref<Message> m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
- void ms_handle_connect(crimson::net::ConnectionRef conn) final;
+ void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) final;
seastar::future<> handle_mgr_map(crimson::net::ConnectionRef conn,
Ref<MMgrMap> m);
seastar::future<> handle_mgr_conf(crimson::net::ConnectionRef conn,
// used to throttle the connection if it's too busy.
virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
- virtual void ms_handle_accept(ConnectionRef conn) {}
+ // The connection is accepted or recoverred(lossless), all the followup
+ // events and messages will be dispatched to the new_shard.
+ virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard) {}
- virtual void ms_handle_connect(ConnectionRef conn) {}
+ // The connection is (re)connected, all the followup events and messages will
+ // be dispatched to the new_shard.
+ virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {}
// a reset event is dispatched when the connection is closed unexpectedly.
// is_replace=true means the reset connection is going to be replaced by
}
void
-ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
+ChainedDispatchers::ms_handle_accept(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard) {
try {
for (auto& dispatcher : dispatchers) {
- dispatcher->ms_handle_accept(conn);
+ dispatcher->ms_handle_accept(conn, new_shard);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_accept() {}",
}
void
-ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
+ChainedDispatchers::ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard) {
try {
for(auto& dispatcher : dispatchers) {
- dispatcher->ms_handle_connect(conn);
+ dispatcher->ms_handle_connect(conn, new_shard);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_connect() {}",
#pragma once
+#include <seastar/core/smp.hh>
+
#include "Fwd.h"
#include "crimson/common/log.h"
return dispatchers.empty();
}
seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
- void ms_handle_accept(crimson::net::ConnectionRef conn);
- void ms_handle_connect(crimson::net::ConnectionRef conn);
+ void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id);
+ void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id);
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
// happening to a connected connection.
protocol_is_connected = true;
ceph_assert_always(conn_ref);
- dispatchers.ms_handle_accept(conn_ref);
+ dispatchers.ms_handle_accept(conn_ref, get_shard_id());
}
void IOHandler::dispatch_connect()
ceph_assert_always(protocol_is_connected == false);
protocol_is_connected = true;
ceph_assert_always(conn_ref);
- dispatchers.ms_handle_connect(conn_ref);
+ dispatchers.ms_handle_connect(conn_ref, get_shard_id());
}
void IOHandler::dispatch_reset(bool is_replace)
}
}
-void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard)
{
+ ceph_assert_always(seastar::this_shard_id() == new_shard);
auto peer = conn->get_peer_id();
if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
peer == entity_name_t::NEW) {
}
}
-void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_accept(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard)
{
+ ceph_assert_always(seastar::this_shard_id() == new_shard);
auto peer = conn->get_peer_id();
if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
peer == entity_name_t::NEW) {
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef conn, MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
- void ms_handle_connect(crimson::net::ConnectionRef conn) override;
- void ms_handle_accept(crimson::net::ConnectionRef conn) override;
+ void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override;
+ void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id) override;
void print(std::ostream&) const;
private:
return nr_depth - depth.current();
}
- void ms_handle_connect(crimson::net::ConnectionRef conn) override {
+ void ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard) override {
+ ceph_assert_always(new_shard == seastar::this_shard_id());
conn_stats.connected_time = mono_clock::now();
}
std::optional<seastar::future<>> ms_dispatch(
return found->second;
}
- void ms_handle_connect(crimson::net::ConnectionRef conn) override {
+ void ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard) override {
+ assert(new_shard == seastar::this_shard_id());
auto session = seastar::make_shared<PingSession>();
auto [i, added] = sessions.emplace(conn, session);
std::ignore = i;
return {seastar::now()};
}
- void ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id new_shard) override {
+ assert(new_shard == seastar::this_shard_id());
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked accepted connection: {}", *conn);
std::ignore = flush_pending_send();
}
- void ms_handle_connect(ConnectionRef conn) override {
+ void ms_handle_connect(
+ ConnectionRef conn,
+ seastar::shard_id new_shard) override {
+ assert(new_shard == seastar::this_shard_id());
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked connected connection: {}", *conn);
return {seastar::now()};
}
- void ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id new_shard) override {
+ assert(new_shard == seastar::this_shard_id());
logger().info("[TestPeer] got accept from Test");
ceph_assert(!tracked_conn ||
tracked_conn->is_closed() ||
return {seastar::now()};
}
- void ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id new_shard) override {
+ assert(new_shard == seastar::this_shard_id());
cmd_conn = conn;
}
}
}
- void ms_handle_accept(crimson::net::ConnectionRef conn) {
+ void ms_handle_accept(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard) {
logger().info("{} - Connection:{}", __func__, *conn);
+ assert(new_shard == seastar::this_shard_id());
}
- void ms_handle_connect(crimson::net::ConnectionRef conn) {
+ void ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard) {
logger().info("{} - Connection:{}", __func__, *conn);
+ assert(new_shard == seastar::this_shard_id());
}
void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace);