void Client::ms_handle_connect(
crimson::net::ConnectionRef c,
- seastar::shard_id new_shard)
+ seastar::shard_id prv_shard)
{
- ceph_assert_always(new_shard == seastar::this_shard_id());
+ ceph_assert_always(prv_shard == seastar::this_shard_id());
gate.dispatch_in_background(__func__, *this, [this, c] {
if (conn == c) {
// ask for the mgrconfigure message
// used to throttle the connection if it's too busy.
virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
+ // The connection is moving to the new_shard under accept/connect.
+ // User should not operate conn in this shard thereafter.
+ virtual void ms_handle_shard_change(
+ ConnectionRef conn,
+ seastar::shard_id new_shard,
+ bool is_accept_or_connect) {}
+
// The connection is accepted or recoverred(lossless), all the followup
- // events and messages will be dispatched to the new_shard.
+ // events and messages will be dispatched to this shard.
//
// is_replace=true means the accepted connection has replaced
// another connecting connection with the same peer_addr, which currently only
// happens under lossy policy when both sides wish to connect to each other.
- virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard, bool is_replace) {}
+ virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id prv_shard, bool is_replace) {}
// The connection is (re)connected, all the followup events and messages will
- // be dispatched to the new_shard.
- virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {}
+ // be dispatched to this shard.
+ virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id prv_shard) {}
// a reset event is dispatched when the connection is closed unexpectedly.
//
}
auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_connect()
+ crosscore.prepare_submit();
logger().info("{} connected: gs={}, pgs={}, cs={}, "
"client_cookie={}, server_cookie={}, {}, new_sid={}, "
"send {} IOHandler::dispatch_connect()",
// set io_handler to a new shard
auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
auto new_io_shard = frame_assembler->get_socket_shard_id();
logger().debug("{} send {} IOHandler::dispatch_accept({})",
conn, cc_seq, new_io_shard);
// set io_handler to a new shard
// we should prevent parallel switching core attemps
auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
logger().debug("{} send {} IOHandler::dispatch_accept({})",
conn, cc_seq, new_io_shard);
ConnectionFRef conn_fref = seastar::make_foreign(
namespace crimson::net {
seastar::future<>
-ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
+ChainedDispatchers::ms_dispatch(ConnectionRef conn,
MessageRef m) {
try {
for (auto& dispatcher : dispatchers) {
}
void
-ChainedDispatchers::ms_handle_accept(
- crimson::net::ConnectionRef conn,
+ChainedDispatchers::ms_handle_shard_change(
+ ConnectionRef conn,
seastar::shard_id new_shard,
+ bool ac) {
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_shard_change(conn, new_shard, ac);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_shard_change() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id prv_shard,
bool is_replace) {
try {
for (auto& dispatcher : dispatchers) {
- dispatcher->ms_handle_accept(conn, new_shard, is_replace);
+ dispatcher->ms_handle_accept(conn, prv_shard, is_replace);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_accept() {}",
void
ChainedDispatchers::ms_handle_connect(
- crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard) {
+ ConnectionRef conn,
+ seastar::shard_id prv_shard) {
try {
for(auto& dispatcher : dispatchers) {
- dispatcher->ms_handle_connect(conn, new_shard);
+ dispatcher->ms_handle_connect(conn, prv_shard);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_connect() {}",
}
void
-ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+ChainedDispatchers::ms_handle_reset(ConnectionRef conn, bool is_replace) {
try {
for (auto& dispatcher : dispatchers) {
dispatcher->ms_handle_reset(conn, is_replace);
}
void
-ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
+ChainedDispatchers::ms_handle_remote_reset(ConnectionRef conn) {
try {
for (auto& dispatcher : dispatchers) {
dispatcher->ms_handle_remote_reset(conn);
bool empty() const {
return dispatchers.empty();
}
- seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
- void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace);
- void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id);
- void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
- void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
+ seastar::future<> ms_dispatch(ConnectionRef, MessageRef);
+ void ms_handle_shard_change(ConnectionRef, seastar::shard_id, bool);
+ void ms_handle_accept(ConnectionRef conn, seastar::shard_id, bool is_replace);
+ void ms_handle_connect(ConnectionRef conn, seastar::shard_id);
+ void ms_handle_reset(ConnectionRef conn, bool is_replace);
+ void ms_handle_remote_reset(ConnectionRef conn);
private:
dispatchers_t dispatchers;
ConnectionFRef conn_fref,
bool is_replace)
{
- ceph_assert_always(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
- logger().debug("{} got {} dispatch_accept(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
- ).then([this, cc_seq, new_sid, is_replace,
- conn_fref=std::move(conn_fref)]() mutable {
- return dispatch_accept(cc_seq, new_sid, std::move(conn_fref), is_replace);
- });
- }
-
- logger().debug("{} got {} dispatch_accept(new_sid={}, replace={}) at {}",
- conn, cc_seq, new_sid, is_replace, io_stat_printer{*this});
- if (get_io_state() == io_state_t::drop) {
- assert(!protocol_is_connected);
- // it is possible that both io_handler and protocolv2 are
- // trying to close each other from different cores simultaneously.
- return to_new_sid(new_sid, std::move(conn_fref));
- }
- // protocol_is_connected can be from true to true here if the replacing is
- // happening to a connected connection.
- protocol_is_connected = true;
- ceph_assert_always(conn_ref);
- auto _conn_ref = conn_ref;
- auto fut = to_new_sid(new_sid, std::move(conn_fref));
-
- dispatchers.ms_handle_accept(_conn_ref, new_sid, is_replace);
- // user can make changes
-
- return fut;
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
}
seastar::future<>
seastar::shard_id new_sid,
ConnectionFRef conn_fref)
{
- ceph_assert_always(seastar::this_shard_id() == get_shard_id());
- if (!crosscore.proceed_or_wait(cc_seq)) {
- logger().debug("{} got {} dispatch_connect(), wait at {}",
- conn, cc_seq, crosscore.get_in_seq());
- return crosscore.wait(cc_seq
- ).then([this, cc_seq, new_sid,
- conn_fref=std::move(conn_fref)]() mutable {
- return dispatch_connect(cc_seq, new_sid, std::move(conn_fref));
- });
- }
-
- logger().debug("{} got {} dispatch_connect({}) at {}",
- conn, cc_seq, new_sid, io_stat_printer{*this});
- if (get_io_state() == io_state_t::drop) {
- assert(!protocol_is_connected);
- // it is possible that both io_handler and protocolv2 are
- // trying to close each other from different cores simultaneously.
- return to_new_sid(new_sid, std::move(conn_fref));
- }
- ceph_assert_always(protocol_is_connected == false);
- protocol_is_connected = true;
- ceph_assert_always(conn_ref);
- auto _conn_ref = conn_ref;
- auto fut = to_new_sid(new_sid, std::move(conn_fref));
-
- dispatchers.ms_handle_connect(_conn_ref, new_sid);
- // user can make changes
-
- return fut;
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt);
}
seastar::future<>
seastar::future<>
IOHandler::to_new_sid(
+ crosscore_t::seq_t cc_seq,
seastar::shard_id new_sid,
- ConnectionFRef conn_fref)
+ ConnectionFRef conn_fref,
+ std::optional<bool> is_replace)
{
- /*
- * Note:
- * - It must be called before user is aware of the new core (through dispatching);
- * - Messenger must wait the returned future for futher operations to prevent racing;
- * - In general, the below submitted continuation should be the first one from the prv sid
- * to the new sid;
- */
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} to_new_sid(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_sid, is_replace,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+ });
+ }
+
+ bool is_accept_or_connect = is_replace.has_value();
+ logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
+ conn, cc_seq, new_sid,
+ fmt::format("{}",
+ is_accept_or_connect ?
+ (*is_replace ? "accept(replace)" : "accept(!replace)") :
+ "connect"),
+ io_stat_printer{*this});
+ auto next_cc_seq = ++cc_seq;
+
+ if (get_io_state() != io_state_t::drop) {
+ ceph_assert_always(conn_ref);
+ if (new_sid != seastar::this_shard_id()) {
+ dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect);
+ // user can make changes
+ }
+ } else {
+ // it is possible that both io_handler and protocolv2 are
+ // trying to close each other from different cores simultaneously.
+ assert(!protocol_is_connected);
+ }
+
+ if (get_io_state() != io_state_t::drop) {
+ if (is_accept_or_connect) {
+ // protocol_is_connected can be from true to true here if the replacing is
+ // happening to a connected connection.
+ } else {
+ ceph_assert_always(protocol_is_connected == false);
+ }
+ protocol_is_connected = true;
+ } else {
+ assert(!protocol_is_connected);
+ }
- assert(seastar::this_shard_id() == get_shard_id());
bool is_dropped = false;
if (get_io_state() == io_state_t::drop) {
is_dropped = true;
assert(new_sid == get_shard_id());
return seastar::smp::submit_to(new_sid,
- [this, is_dropped, prv_sid, conn_fref=std::move(conn_fref)]() mutable {
- logger().debug("{} see new_sid in io_handler(new_sid) from {}, is_dropped={}",
- conn, prv_sid, is_dropped);
+ [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
+ logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
+ conn, next_cc_seq, prv_sid, is_dropped,
+ fmt::format("{}",
+ is_replace.has_value() ?
+ (*is_replace ? "accept(replace)" : "accept(!replace)") :
+ "connect"),
+ io_stat_printer{*this});
ceph_assert_always(seastar::this_shard_id() == get_shard_id());
ceph_assert_always(get_io_state() != io_state_t::open);
ceph_assert_always(!maybe_dropped_sid.has_value());
-
- ceph_assert_always(!conn_ref);
- conn_ref = make_local_shared_foreign(std::move(conn_fref));
+ ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
if (is_dropped) {
- // the follow up cleanups will be done in the prv_sid
+ ceph_assert_always(get_io_state() == io_state_t::drop);
ceph_assert_always(shard_states->assert_closed_and_exit());
maybe_dropped_sid = prv_sid;
+ // cleanup_prv_shard() will be done in a follow-up close_io()
} else {
- // may be at io_state_t::drop
- // cleanup the prvious shard
+ // possible at io_state_t::drop
+
+ // previous shard is not cleaned,
+ // but close_io() is responsible to clean up the current shard,
+ // so cleanup the previous shard here.
shard_states->dispatch_in_background(
"cleanup_prv_sid", conn, [this, prv_sid] {
return cleanup_prv_shard(prv_sid);
});
maybe_notify_out_dispatch();
}
+
+ ceph_assert_always(!conn_ref);
+ // assign even if already dropping
+ conn_ref = make_local_shared_foreign(std::move(conn_fref));
+
+ if (get_io_state() != io_state_t::drop) {
+ if (is_replace.has_value()) {
+ dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace);
+ } else {
+ dispatchers.ms_handle_connect(conn_ref, prv_sid);
+ }
+ // user can make changes
+ }
});
}
seastar::future<> do_send_keepalive();
seastar::future<> to_new_sid(
- seastar::shard_id new_sid, ConnectionFRef);
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef,
+ std::optional<bool> is_replace);
void dispatch_reset(bool is_replace);
void Heartbeat::ms_handle_connect(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard)
+ seastar::shard_id prv_shard)
{
- ceph_assert_always(seastar::this_shard_id() == new_shard);
+ ceph_assert_always(seastar::this_shard_id() == prv_shard);
auto peer = conn->get_peer_id();
if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
peer == entity_name_t::NEW) {
void Heartbeat::ms_handle_accept(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard,
+ seastar::shard_id prv_shard,
bool is_replace)
{
- ceph_assert_always(seastar::this_shard_id() == new_shard);
+ ceph_assert_always(seastar::this_shard_id() == prv_shard);
auto peer = conn->get_peer_id();
if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
peer == entity_name_t::NEW) {
void ms_handle_connect(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard) override {
- ceph_assert_always(new_shard == seastar::this_shard_id());
+ seastar::shard_id prv_shard) override {
+ ceph_assert_always(prv_shard == seastar::this_shard_id());
assert(is_active());
unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
auto &conn_state = conn_states[index];
void ms_handle_accept(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard,
+ seastar::shard_id prv_shard,
bool is_replace) override {
logger().info("server accepted {}", *conn);
- ceph_assert(new_shard == seastar::this_shard_id());
+ ceph_assert(prv_shard == seastar::this_shard_id());
ceph_assert(!is_replace);
}
void ms_handle_connect(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard) override {
- assert(new_shard == seastar::this_shard_id());
+ seastar::shard_id prv_shard) override {
+ assert(prv_shard == seastar::this_shard_id());
auto session = seastar::make_shared<PingSession>();
auto [i, added] = sessions.emplace(conn, session);
std::ignore = i;
void ms_handle_accept(
ConnectionRef conn,
- seastar::shard_id new_shard,
+ seastar::shard_id prv_shard,
bool is_replace) override {
- assert(new_shard == seastar::this_shard_id());
+ assert(prv_shard == seastar::this_shard_id());
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked accepted connection: {}", *conn);
void ms_handle_connect(
ConnectionRef conn,
- seastar::shard_id new_shard) override {
- assert(new_shard == seastar::this_shard_id());
+ seastar::shard_id prv_shard) override {
+ assert(prv_shard == seastar::this_shard_id());
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked connected connection: {}", *conn);
void ms_handle_accept(
ConnectionRef conn,
- seastar::shard_id new_shard,
+ seastar::shard_id prv_shard,
bool is_replace) override {
- assert(new_shard == seastar::this_shard_id());
+ assert(prv_shard == seastar::this_shard_id());
logger().info("[TestPeer] got accept from Test");
ceph_assert(!tracked_conn ||
tracked_conn->is_closed() ||
void ms_handle_accept(
ConnectionRef conn,
- seastar::shard_id new_shard,
+ seastar::shard_id prv_shard,
bool is_replace) override {
- assert(new_shard == seastar::this_shard_id());
+ assert(prv_shard == seastar::this_shard_id());
cmd_conn = conn;
}
void ms_handle_accept(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard,
+ seastar::shard_id prv_shard,
bool is_replace) final {
logger().info("{} - Connection:{}", __func__, *conn);
- assert(new_shard == seastar::this_shard_id());
+ assert(prv_shard == seastar::this_shard_id());
}
void ms_handle_connect(
crimson::net::ConnectionRef conn,
- seastar::shard_id new_shard) final {
+ seastar::shard_id prv_shard) final {
logger().info("{} - Connection:{}", __func__, *conn);
- assert(new_shard == seastar::this_shard_id());
+ assert(prv_shard == seastar::this_shard_id());
}
void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace) final;