]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: notify the new connection shard upon accept/connect
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 2 Jun 2023 09:22:53 +0000 (17:22 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
Users may need to know the new connection shard prior to message
dispatching. Otherwise, there will be no chance for user to do any
related preparations.

This is still a placeholder before multi-core messegner is enabled.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/mgr/client.cc
src/crimson/mgr/client.h
src/crimson/net/Dispatcher.h
src/crimson/net/chained_dispatchers.cc
src/crimson/net/chained_dispatchers.h
src/crimson/net/io_handler.cc
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/tools/perf_crimson_msgr.cc
src/test/crimson/test_messenger.cc
src/test/crimson/test_messenger_thrash.cc

index 6e3d7cdd848c007970be328b8e2d5b6241499eb4..b81f204871866928fadb0e56b9ba914566a9526f 100644 (file)
@@ -65,8 +65,11 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
   return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
-void Client::ms_handle_connect(crimson::net::ConnectionRef c)
+void Client::ms_handle_connect(
+    crimson::net::ConnectionRef c,
+    seastar::shard_id new_shard)
 {
+  ceph_assert_always(new_shard == seastar::this_shard_id());
   gate.dispatch_in_background(__func__, *this, [this, c] {
     if (conn == c) {
       // ask for the mgrconfigure message
index e8457543305681defdb7c7881bda2640e87512a3..feceae55dbb0b845fc4448831fd5976fd9660524 100644 (file)
@@ -40,7 +40,7 @@ private:
   std::optional<seastar::future<>> ms_dispatch(
       crimson::net::ConnectionRef conn, Ref<Message> m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
-  void ms_handle_connect(crimson::net::ConnectionRef conn) final;
+  void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) final;
   seastar::future<> handle_mgr_map(crimson::net::ConnectionRef conn,
                                   Ref<MMgrMap> m);
   seastar::future<> handle_mgr_conf(crimson::net::ConnectionRef conn,
index cc6fd4574c7518da11c8471352ba1b0f35d0671d..c563b5e266f2e607a043e440456ab4a297821a74 100644 (file)
@@ -30,9 +30,13 @@ class Dispatcher {
   // used to throttle the connection if it's too busy.
   virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
 
-  virtual void ms_handle_accept(ConnectionRef conn) {}
+  // The connection is accepted or recoverred(lossless), all the followup
+  // events and messages will be dispatched to the new_shard.
+  virtual void ms_handle_accept(ConnectionRef conn, seastar::shard_id new_shard) {}
 
-  virtual void ms_handle_connect(ConnectionRef conn) {}
+  // The connection is (re)connected, all the followup events and messages will
+  // be dispatched to the new_shard.
+  virtual void ms_handle_connect(ConnectionRef conn, seastar::shard_id new_shard) {}
 
   // a reset event is dispatched when the connection is closed unexpectedly.
   // is_replace=true means the reset connection is going to be replaced by
index b13d40c8f7318d720ed0445ac37ab4e1ee8f7657..2656c0e57492bc207747a2ae0b1c15069ef8a9d0 100644 (file)
@@ -39,10 +39,12 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
 }
 
 void
-ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
+ChainedDispatchers::ms_handle_accept(
+    crimson::net::ConnectionRef conn,
+    seastar::shard_id new_shard) {
   try {
     for (auto& dispatcher : dispatchers) {
-      dispatcher->ms_handle_accept(conn);
+      dispatcher->ms_handle_accept(conn, new_shard);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_accept() {}",
@@ -52,10 +54,12 @@ ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
 }
 
 void
-ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
+ChainedDispatchers::ms_handle_connect(
+    crimson::net::ConnectionRef conn,
+    seastar::shard_id new_shard) {
   try {
     for(auto& dispatcher : dispatchers) {
-      dispatcher->ms_handle_connect(conn);
+      dispatcher->ms_handle_connect(conn, new_shard);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_connect() {}",
index 712b0894b9fd257518f125be2f45e65c9acfa575..40356e9d473678be1ce0e46b88b8930704f6a424 100644 (file)
@@ -3,6 +3,8 @@
 
 #pragma once
 
+#include <seastar/core/smp.hh>
+
 #include "Fwd.h"
 #include "crimson/common/log.h"
 
@@ -24,8 +26,8 @@ public:
     return dispatchers.empty();
   }
   seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
-  void ms_handle_accept(crimson::net::ConnectionRef conn);
-  void ms_handle_connect(crimson::net::ConnectionRef conn);
+  void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id);
+  void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id);
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
   void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
 
index 3e156ca67301eba765eaf4e191e5e212728ee525..54ef67356ab0fdc3be03857fbfee500b1ca1178a 100644 (file)
@@ -408,7 +408,7 @@ void IOHandler::dispatch_accept()
   // happening to a connected connection.
   protocol_is_connected = true;
   ceph_assert_always(conn_ref);
-  dispatchers.ms_handle_accept(conn_ref);
+  dispatchers.ms_handle_accept(conn_ref, get_shard_id());
 }
 
 void IOHandler::dispatch_connect()
@@ -419,7 +419,7 @@ void IOHandler::dispatch_connect()
   ceph_assert_always(protocol_is_connected == false);
   protocol_is_connected = true;
   ceph_assert_always(conn_ref);
-  dispatchers.ms_handle_connect(conn_ref);
+  dispatchers.ms_handle_connect(conn_ref, get_shard_id());
 }
 
 void IOHandler::dispatch_reset(bool is_replace)
index be6f600085187134c1ca24313a895c5838efdd66..30de528291ab710ddf649bba6d60802eaec559d0 100644 (file)
@@ -236,8 +236,11 @@ void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replac
   }
 }
 
-void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_connect(
+    crimson::net::ConnectionRef conn,
+    seastar::shard_id new_shard)
 {
+  ceph_assert_always(seastar::this_shard_id() == new_shard);
   auto peer = conn->get_peer_id();
   if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
       peer == entity_name_t::NEW) {
@@ -249,8 +252,11 @@ void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
   }
 }
 
-void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+void Heartbeat::ms_handle_accept(
+    crimson::net::ConnectionRef conn,
+    seastar::shard_id new_shard)
 {
+  ceph_assert_always(seastar::this_shard_id() == new_shard);
   auto peer = conn->get_peer_id();
   if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
       peer == entity_name_t::NEW) {
index c483a689395e8379d55b13082e1f8607409bebe5..c5bf8f0ded2322df8682b755a7aa07542475857b 100644 (file)
@@ -52,8 +52,8 @@ public:
   std::optional<seastar::future<>> ms_dispatch(
       crimson::net::ConnectionRef conn, MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
-  void ms_handle_connect(crimson::net::ConnectionRef conn) override;
-  void ms_handle_accept(crimson::net::ConnectionRef conn) override;
+  void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override;
+  void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id) override;
 
   void print(std::ostream&) const;
 private:
index ef5602b0f27bf8b8ed2c9b31c1a6099024406495..2d45dfaaff2f7d3e7c586cb2c266f388157c1dcd 100644 (file)
@@ -301,7 +301,10 @@ static seastar::future<> run(
         return nr_depth - depth.current();
       }
 
-      void ms_handle_connect(crimson::net::ConnectionRef conn) override {
+      void ms_handle_connect(
+          crimson::net::ConnectionRef conn,
+          seastar::shard_id new_shard) override {
+        ceph_assert_always(new_shard == seastar::this_shard_id());
         conn_stats.connected_time = mono_clock::now();
       }
       std::optional<seastar::future<>> ms_dispatch(
index 0107fa6ccc566b599d6836c9cba86941425909d6..8af521d48c80e4932e319eb291e728df91712914 100644 (file)
@@ -123,7 +123,10 @@ static seastar::future<> test_echo(unsigned rounds,
         return found->second;
       }
 
-      void ms_handle_connect(crimson::net::ConnectionRef conn) override {
+      void ms_handle_connect(
+          crimson::net::ConnectionRef conn,
+          seastar::shard_id new_shard) override {
+        assert(new_shard == seastar::this_shard_id());
         auto session = seastar::make_shared<PingSession>();
         auto [i, added] = sessions.emplace(conn, session);
         std::ignore = i;
@@ -851,7 +854,10 @@ class FailoverSuite : public Dispatcher {
     return {seastar::now()};
   }
 
-  void ms_handle_accept(ConnectionRef conn) override {
+  void ms_handle_accept(
+      ConnectionRef conn,
+      seastar::shard_id new_shard) override {
+    assert(new_shard == seastar::this_shard_id());
     auto result = interceptor.find_result(conn);
     if (result == nullptr) {
       logger().error("Untracked accepted connection: {}", *conn);
@@ -874,7 +880,10 @@ class FailoverSuite : public Dispatcher {
     std::ignore = flush_pending_send();
   }
 
-  void ms_handle_connect(ConnectionRef conn) override {
+  void ms_handle_connect(
+      ConnectionRef conn,
+      seastar::shard_id new_shard) override {
+    assert(new_shard == seastar::this_shard_id());
     auto result = interceptor.find_result(conn);
     if (result == nullptr) {
       logger().error("Untracked connected connection: {}", *conn);
@@ -1432,7 +1441,10 @@ class FailoverSuitePeer : public Dispatcher {
     return {seastar::now()};
   }
 
-  void ms_handle_accept(ConnectionRef conn) override {
+  void ms_handle_accept(
+      ConnectionRef conn,
+      seastar::shard_id new_shard) override {
+    assert(new_shard == seastar::this_shard_id());
     logger().info("[TestPeer] got accept from Test");
     ceph_assert(!tracked_conn ||
                 tracked_conn->is_closed() ||
@@ -1587,7 +1599,10 @@ class FailoverTestPeer : public Dispatcher {
     return {seastar::now()};
   }
 
-  void ms_handle_accept(ConnectionRef conn) override {
+  void ms_handle_accept(
+      ConnectionRef conn,
+      seastar::shard_id new_shard) override {
+    assert(new_shard == seastar::this_shard_id());
     cmd_conn = conn;
   }
 
index b3a1d910b5dba0828a6fc4347c4fbacc454b0062..7be81f831c8da1479c44ff9e9b83e6d65a6d04e7 100644 (file)
@@ -134,12 +134,18 @@ class SyntheticDispatcher final
     }
   }
 
-  void ms_handle_accept(crimson::net::ConnectionRef conn) {
+  void ms_handle_accept(
+      crimson::net::ConnectionRef conn,
+      seastar::shard_id new_shard) {
     logger().info("{} - Connection:{}", __func__, *conn);
+    assert(new_shard == seastar::this_shard_id());
   }
 
-  void ms_handle_connect(crimson::net::ConnectionRef conn) {
+  void ms_handle_connect(
+      crimson::net::ConnectionRef conn,
+      seastar::shard_id new_shard) {
     logger().info("{} - Connection:{}", __func__, *conn);
+    assert(new_shard == seastar::this_shard_id());
   }
 
   void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace);