]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: decouple tests from SocketConnection class
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 24 Apr 2023 05:47:22 +0000 (13:47 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:31 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 7797cabe2ab38eea30eca51614398badef8bada1)

src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/Fwd.h
src/crimson/net/Interceptor.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/io_handler.cc
src/test/crimson/test_messenger.cc

index 165ae18a1a0b57bcc451836d47d0dd535939f990..c952bb1bd56938d1df68918f0c4879eb8f4dd305 100644 (file)
@@ -44,8 +44,10 @@ void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
   assert(has_socket());
   if (conn.interceptor) {
     auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    // FIXME: doesn't support cross-core
     auto action = conn.interceptor->intercept(
-        conn, Breakpoint{tag, type});
+        conn.get_local_shared_foreign_from_this(),
+        Breakpoint{tag, type});
     socket->set_trap(type, action, &conn.interceptor->blocker);
   }
 }
index 3eb57ef9781307d83ca4dd7efdded71f24116362..1dc4809810de27e2174b350c2e2c82d0041a5512 100644 (file)
@@ -39,8 +39,6 @@ using ConnectionLRef = seastar::shared_ptr<Connection>;
 using ConnectionFRef = seastar::foreign_ptr<ConnectionLRef>;
 using ConnectionRef = ::crimson::local_shared_foreign_ptr<ConnectionLRef>;
 
-class SocketConnection;
-
 class Dispatcher;
 class ChainedDispatchers;
 constexpr std::size_t NUM_DISPATCHERS = 4u;
index 41ec31f3755768d538ea8c66c0689c518d3d8551..764facf7be1099dffb3d5c22e362ce2b9ecb0eb9 100644 (file)
@@ -116,11 +116,11 @@ struct Breakpoint {
 struct Interceptor {
   socket_blocker blocker;
   virtual ~Interceptor() {}
-  virtual void register_conn(SocketConnection& conn) = 0;
-  virtual void register_conn_ready(SocketConnection& conn) = 0;
-  virtual void register_conn_closed(SocketConnection& conn) = 0;
-  virtual void register_conn_replaced(SocketConnection& conn) = 0;
-  virtual bp_action_t intercept(SocketConnection& conn, Breakpoint bp) = 0;
+  virtual void register_conn(ConnectionRef) = 0;
+  virtual void register_conn_ready(ConnectionRef) = 0;
+  virtual void register_conn_closed(ConnectionRef) = 0;
+  virtual void register_conn_replaced(ConnectionRef) = 0;
+  virtual bp_action_t intercept(ConnectionRef, Breakpoint bp) = 0;
 };
 
 } // namespace crimson::net
index 543f2581b476d0d5b5210ca754190316a8d3d865..9d1956c3c111725eafc74b2c0d72ea4dc430430f 100644 (file)
@@ -111,7 +111,9 @@ void intercept(Breakpoint bp,
                Interceptor *interceptor,
                Socket *socket) {
   if (interceptor) {
-    auto action = interceptor->intercept(conn, Breakpoint(bp));
+    auto action = interceptor->intercept(
+        conn.get_local_shared_foreign_from_this(),
+        Breakpoint(bp));
     socket->set_trap(type, action, &interceptor->blocker);
   }
 }
@@ -784,7 +786,8 @@ void ProtocolV2::execute_connecting()
           // supports CONTINUE/FAULT/BLOCK
           if (conn.interceptor) {
             auto action = conn.interceptor->intercept(
-                conn, {custom_bp_t::SOCKET_CONNECTING});
+                conn.get_local_shared_foreign_from_this(),
+                {custom_bp_t::SOCKET_CONNECTING});
             switch (action) {
             case bp_action_t::CONTINUE:
               return seastar::now();
@@ -796,6 +799,7 @@ void ProtocolV2::execute_connecting()
               return conn.interceptor->blocker.block();
             default:
               ceph_abort("unexpected action from trap");
+              return seastar::now();
             }
           } else {
             return seastar::now();
@@ -1064,7 +1068,8 @@ ProtocolV2::reuse_connection(
   has_socket = false;
 #ifdef UNIT_TESTS_BUILT
   if (conn.interceptor) {
-    conn.interceptor->register_conn_replaced(conn);
+    conn.interceptor->register_conn_replaced(
+        conn.get_local_shared_foreign_from_this());
   }
 #endif
   // close this connection because all the necessary information is delivered
@@ -1485,7 +1490,8 @@ void ProtocolV2::execute_accepting()
 #ifdef UNIT_TESTS_BUILT
           if (conn.interceptor) {
             auto action = conn.interceptor->intercept(
-                conn, {custom_bp_t::SOCKET_ACCEPTED});
+                conn.get_local_shared_foreign_from_this(),
+                {custom_bp_t::SOCKET_ACCEPTED});
             switch (action) {
             case bp_action_t::CONTINUE:
               break;
@@ -2002,7 +2008,8 @@ void ProtocolV2::do_close(
 #ifdef UNIT_TESTS_BUILT
     closed_clean = true;
     if (conn.interceptor) {
-      conn.interceptor->register_conn_closed(conn);
+      conn.interceptor->register_conn_closed(
+          conn.get_local_shared_foreign_from_this());
     }
 #endif
   }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
index f0eccfe2d43e2fdd48814daab20f46a5324e2bd1..1cbc3cf3fbadc56892ae123589cc3c233a74a9a3 100644 (file)
@@ -36,7 +36,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
 #ifdef UNIT_TESTS_BUILT
   if (messenger.interceptor) {
     interceptor = messenger.interceptor;
-    interceptor->register_conn(*this);
+    interceptor->register_conn(this->get_local_shared_foreign_from_this());
   }
 #endif
 }
index 718d1da318a0b82cf4c888e4dfbc00c1044e587d..c986370fc015f59171c19f49303b286c2e697808 100644 (file)
@@ -25,6 +25,7 @@ namespace crimson::net {
 
 class ProtocolV2;
 class SocketMessenger;
+class SocketConnection;
 using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
 
 #ifdef UNIT_TESTS_BUILT
index 328ca724eb7aaf48da2aca18de7a13463c333c0a..63e0c93c9fd9cb88aca5f9267900bbea27ce5dcc 100644 (file)
@@ -192,7 +192,9 @@ void IOHandler::set_io_state(
     dispatch_in = true;
 #ifdef UNIT_TESTS_BUILT
     if (conn.interceptor) {
-      conn.interceptor->register_conn_ready(conn);
+      // FIXME: doesn't support cross-core
+      conn.interceptor->register_conn_ready(
+          conn.get_local_shared_foreign_from_this());
     }
 #endif
   } else if (io_state == io_state_t::open) {
index 6fc9c1d7750c80e4527a5a7cf5525c5566c97bf6..0107fa6ccc566b599d6836c9cba86941425909d6 100644 (file)
@@ -14,7 +14,6 @@
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/net/Interceptor.h"
-#include "crimson/net/SocketConnection.h"
 
 #include <map>
 #include <random>
@@ -496,7 +495,6 @@ using crimson::net::Dispatcher;
 using crimson::net::Interceptor;
 using crimson::net::Messenger;
 using crimson::net::MessengerRef;
-using crimson::net::SocketConnection;
 using crimson::net::SocketPolicy;
 using crimson::net::tag_bp_t;
 using namespace ceph::net::test;
@@ -699,25 +697,24 @@ struct TestInterceptor : public Interceptor {
   }
 
  private:
-  void register_conn(SocketConnection& _conn) override {
-    auto conn = _conn.get_local_shared_foreign_from_this();
+  void register_conn(ConnectionRef conn) override {
     auto result = find_result(conn);
     if (result != nullptr) {
       logger().error("The connection [{}] {} already exists when register {}",
-                     result->index, *result->conn, _conn);
+                     result->index, *result->conn, *conn);
       ceph_abort();
     }
     unsigned index = results.size();
     results.emplace_back(conn, index);
     conns[conn] = index;
     notify();
-    logger().info("[{}] {} new connection registered", index, _conn);
+    logger().info("[{}] {} new connection registered", index, *conn);
   }
 
-  void register_conn_closed(SocketConnection& conn) override {
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
+  void register_conn_closed(ConnectionRef conn) override {
+    auto result = find_result(conn);
     if (result == nullptr) {
-      logger().error("Untracked closed connection: {}", conn);
+      logger().error("Untracked closed connection: {}", *conn);
       ceph_abort();
     }
 
@@ -725,39 +722,39 @@ struct TestInterceptor : public Interceptor {
       result->state = conn_state_t::closed;
     }
     notify();
-    logger().info("[{}] {} closed({})", result->index, conn, result->state);
+    logger().info("[{}] {} closed({})", result->index, *conn, result->state);
   }
 
-  void register_conn_ready(SocketConnection& conn) override {
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
+  void register_conn_ready(ConnectionRef conn) override {
+    auto result = find_result(conn);
     if (result == nullptr) {
-      logger().error("Untracked ready connection: {}", conn);
+      logger().error("Untracked ready connection: {}", *conn);
       ceph_abort();
     }
 
-    ceph_assert(conn.is_connected());
+    ceph_assert(conn->is_connected());
     notify();
-    logger().info("[{}] {} ready", result->index, conn);
+    logger().info("[{}] {} ready", result->index, *conn);
   }
 
-  void register_conn_replaced(SocketConnection& conn) override {
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
+  void register_conn_replaced(ConnectionRef conn) override {
+    auto result = find_result(conn);
     if (result == nullptr) {
-      logger().error("Untracked replaced connection: {}", conn);
+      logger().error("Untracked replaced connection: {}", *conn);
       ceph_abort();
     }
 
     result->state = conn_state_t::replaced;
-    logger().info("[{}] {} {}", result->index, conn, result->state);
+    logger().info("[{}] {} {}", result->index, *conn, result->state);
   }
 
-  bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
+  bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override {
     ++breakpoints_counter[bp].counter;
 
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
+    auto result = find_result(conn);
     if (result == nullptr) {
       logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
-                     conn, bp, breakpoints_counter[bp].counter);
+                     *conn, bp, breakpoints_counter[bp].counter);
       ceph_abort();
     }
 
@@ -786,13 +783,13 @@ struct TestInterceptor : public Interceptor {
       auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
       if (it_cnt != it_bp->second.end()) {
         logger().info("[{}] {} intercepted {}({}) => {}",
-                      result->index, conn, bp,
+                      result->index, *conn, bp,
                       breakpoints_counter[bp].counter, it_cnt->second);
         return it_cnt->second;
       }
     }
     logger().info("[{}] {} intercepted {}({})",
-                  result->index, conn, bp, breakpoints_counter[bp].counter);
+                  result->index, *conn, bp, breakpoints_counter[bp].counter);
     return bp_action_t::CONTINUE;
   }
 };