]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: move intercept_frame() from ProtocolV2 down to FrameAssemblerV2
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 1 Dec 2022 01:47:36 +0000 (09:47 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 034cf8ed4ca3a2228fa54d75671fa3a3a630fc58..6165e2e57dccfaf2ceb6bacedd93acd53700b4ca 100644 (file)
@@ -6,6 +6,10 @@
 #include "Errors.h"
 #include "SocketConnection.h"
 
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
 using ceph::msgr::v2::FrameAssembler;
 using ceph::msgr::v2::FrameError;
 using ceph::msgr::v2::preamble_block_t;
@@ -26,6 +30,20 @@ FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn)
   : conn{_conn}
 {}
 
+#ifdef UNIT_TESTS_BUILT
+// should be consistent to intercept() in ProtocolV2.cc
+void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
+{
+  assert(has_socket());
+  if (conn.interceptor) {
+    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    auto action = conn.interceptor->intercept(
+        conn, Breakpoint{tag, type});
+    socket->set_trap(type, action, &conn.interceptor->blocker);
+  }
+}
+#endif
+
 void FrameAssemblerV2::set_is_rev1(bool _is_rev1)
 {
   is_rev1 = _is_rev1;
@@ -198,6 +216,9 @@ FrameAssemblerV2::read_main_preamble()
     try {
       rx_preamble.append(buffer::create(std::move(bl)));
       const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+#ifdef UNIT_TESTS_BUILT
+      intercept_frame(tag, false);
+#endif
       return read_main_t{tag, &rx_frame_asm};
     } catch (FrameError& e) {
       logger().warn("{} read_main_preamble: {}", conn, e.what());
index b3ee3e03a3cb6dba4e6af7c4598b1adf5a12bcba..5cea92440fe223ee9c76c4079509850d4ab90977 100644 (file)
@@ -103,6 +103,9 @@ public:
 
   template <class F>
   ceph::bufferlist get_buffer(F &tx_frame) {
+#ifdef UNIT_TESTS_BUILT
+    intercept_frame(F::tag, true);
+#endif
     auto bl = tx_frame.get_buffer(tx_frame_asm);
     log_main_preamble(bl);
     return bl;
@@ -121,6 +124,10 @@ private:
 
   void log_main_preamble(const ceph::bufferlist &bl);
 
+#ifdef UNIT_TESTS_BUILT
+  void intercept_frame(ceph::msgr::v2::Tag, bool is_write);
+#endif
+
   SocketConnection &conn;
 
   Socket *socket = nullptr;
index 8514da381e8840e15888f8644fda175bec4526a0..74719ec3babf50fb50b8aa80cf9a7afcecf155de 100644 (file)
@@ -122,6 +122,7 @@ template <typename T> auto ptr(const ::seastar::shared_ptr<T>& p) -> const void*
 namespace crimson::net {
 
 #ifdef UNIT_TESTS_BUILT
+// should be consistent to intercept_frame() in FrameAssemblerV2.cc
 void intercept(Breakpoint bp,
                bp_type_t type,
                Connection& conn,
@@ -137,11 +138,6 @@ void intercept(Breakpoint bp,
 intercept({bp}, type, conn,              \
           conn.interceptor, conn.socket)
 
-#define INTERCEPT_FRAME(tag, type)       \
-intercept({static_cast<Tag>(tag), type}, \
-          type, conn,                    \
-          conn.interceptor, conn.socket)
-
 #define INTERCEPT_N_RW(bp)                               \
 if (conn.interceptor) {                                  \
   auto action = conn.interceptor->intercept(conn, {bp}); \
@@ -153,7 +149,6 @@ if (conn.interceptor) {                                  \
 
 #else
 #define INTERCEPT_CUSTOM(bp, type)
-#define INTERCEPT_FRAME(tag, type)
 #define INTERCEPT_N_RW(bp)
 #endif
 
@@ -224,36 +219,6 @@ void ProtocolV2::start_accept(SocketRef&& new_socket,
   execute_accepting();
 }
 
-// TODO: Frame related implementations, probably to a separate class.
-
-seastar::future<FrameAssemblerV2::read_main_t>
-ProtocolV2::read_main_preamble()
-{
-  return frame_assembler.read_main_preamble(
-#ifdef UNIT_TESTS_BUILT
-  ).then([this](auto ret) {
-    INTERCEPT_FRAME(ret.tag, bp_type_t::READ);
-    return ret;
-  });
-#else
-  );
-#endif
-}
-
-template <class F>
-ceph::bufferlist ProtocolV2::get_buffer(F &tx_frame)
-{
-  INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
-  return frame_assembler.get_buffer(tx_frame);
-}
-
-template <class F>
-seastar::future<> ProtocolV2::write_flush_frame(F &tx_frame)
-{
-  INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
-  return frame_assembler.write_flush_frame(tx_frame);
-}
-
 void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
 {
   if (!reentrant && _state == state) {
@@ -482,10 +447,10 @@ ProtocolV2::banner_exchange(bool is_connect)
       logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
                      conn, ceph_entity_type_name(messenger.get_mytype()),
                      conn.target_addr);
-      return write_flush_frame(hello);
+      return frame_assembler.write_flush_frame(hello);
     }).then([this] {
       //5. read peer HelloFrame
-      return read_main_preamble();
+      return frame_assembler.read_main_preamble();
     }).then([this](auto ret) {
       expect_tag(Tag::HELLO, ret.tag, conn, __func__);
       return frame_assembler.read_frame_payload();
@@ -504,7 +469,7 @@ ProtocolV2::banner_exchange(bool is_connect)
 
 seastar::future<> ProtocolV2::handle_auth_reply()
 {
-  return read_main_preamble(
+  return frame_assembler.read_main_preamble(
   ).then([this](auto ret) {
     switch (ret.tag) {
       case Tag::AUTH_BAD_METHOD:
@@ -542,7 +507,7 @@ seastar::future<> ProtocolV2::handle_auth_reply()
           auto more_reply = AuthRequestMoreFrame::Encode(reply);
           logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
                          conn, reply.length());
-          return write_flush_frame(more_reply);
+          return frame_assembler.write_flush_frame(more_reply);
         }).then([this] {
           return handle_auth_reply();
         });
@@ -591,7 +556,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
     logger().debug("{} WRITE AuthRequestFrame: method={},"
                    " preferred_modes={}, payload_len={}",
                    conn, auth_method, preferred_modes, bl.length());
-    return write_flush_frame(frame).then([this] {
+    return frame_assembler.write_flush_frame(frame).then([this] {
       return handle_auth_reply();
     });
   } catch (const crimson::auth::error& e) {
@@ -639,8 +604,8 @@ ProtocolV2::client_connect()
                  conn.policy.features_supported,
                  conn.policy.features_required | msgr2_required,
                  flags, client_cookie);
-  return write_flush_frame(client_ident).then([this] {
-    return read_main_preamble();
+  return frame_assembler.write_flush_frame(client_ident).then([this] {
+    return frame_assembler.read_main_preamble();
   }).then([this](auto ret) {
     switch (ret.tag) {
       case Tag::IDENT_MISSING_FEATURES:
@@ -742,8 +707,8 @@ ProtocolV2::client_reconnect()
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
                  global_seq, connect_seq, get_in_seq());
-  return write_flush_frame(reconnect).then([this] {
-    return read_main_preamble();
+  return frame_assembler.write_flush_frame(reconnect).then([this] {
+    return frame_assembler.read_main_preamble();
   }).then([this](auto ret) {
     switch (ret.tag) {
       case Tag::SESSION_RETRY_GLOBAL:
@@ -937,7 +902,7 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r)
                 "allowed_methods={}, allowed_modes={})",
                 conn, auth_meta->auth_method, cpp_strerror(r),
                 allowed_methods, allowed_modes);
-  return write_flush_frame(bad_method).then([this] {
+  return frame_assembler.write_flush_frame(bad_method).then([this] {
     return server_auth();
   });
 }
@@ -963,7 +928,7 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
                    conn, conn.peer_global_id,
                    ceph_con_mode_name(auth_meta->con_mode), reply.length());
-    return write_flush_frame(auth_done).then([this] {
+    return frame_assembler.write_flush_frame(auth_done).then([this] {
       ceph_assert(auth_meta);
       frame_assembler.create_session_stream_handlers(*auth_meta, true);
       return finish_auth();
@@ -974,8 +939,8 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     auto more = AuthReplyMoreFrame::Encode(reply);
     logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
                    conn, reply.length());
-    return write_flush_frame(more).then([this] {
-      return read_main_preamble();
+    return frame_assembler.write_flush_frame(more).then([this] {
+      return frame_assembler.read_main_preamble();
     }).then([this](auto ret) {
       expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__);
       return frame_assembler.read_frame_payload();
@@ -1000,7 +965,7 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
 
 seastar::future<> ProtocolV2::server_auth()
 {
-  return read_main_preamble(
+  return frame_assembler.read_main_preamble(
   ).then([this](auto ret) {
     expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__);
     return frame_assembler.read_frame_payload();
@@ -1042,7 +1007,7 @@ ProtocolV2::send_wait()
 {
   auto wait = WaitFrame::Encode();
   logger().debug("{} WRITE WaitFrame", conn);
-  return write_flush_frame(wait).then([] {
+  return frame_assembler.write_flush_frame(wait).then([] {
     return next_step_t::wait;
   });
 }
@@ -1256,7 +1221,7 @@ ProtocolV2::server_connect()
       auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
       logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
                     conn, feat_missing);
-      return write_flush_frame(ident_missing_features).then([] {
+      return frame_assembler.write_flush_frame(ident_missing_features).then([] {
         return next_step_t::wait;
       });
     }
@@ -1294,7 +1259,7 @@ ProtocolV2::server_connect()
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::read_reconnect()
 {
-  return read_main_preamble(
+  return frame_assembler.read_main_preamble(
   ).then([this](auto ret) {
     expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect");
     return server_reconnect();
@@ -1306,7 +1271,7 @@ ProtocolV2::send_retry(uint64_t connect_seq)
 {
   auto retry = RetryFrame::Encode(connect_seq);
   logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
-  return write_flush_frame(retry).then([this] {
+  return frame_assembler.write_flush_frame(retry).then([this] {
     return read_reconnect();
   });
 }
@@ -1316,7 +1281,7 @@ ProtocolV2::send_retry_global(uint64_t global_seq)
 {
   auto retry = RetryGlobalFrame::Encode(global_seq);
   logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
-  return write_flush_frame(retry).then([this] {
+  return frame_assembler.write_flush_frame(retry).then([this] {
     return read_reconnect();
   });
 }
@@ -1326,8 +1291,8 @@ ProtocolV2::send_reset(bool full)
 {
   auto reset = ResetFrame::Encode(full);
   logger().warn("{} WRITE ResetFrame: full={}", conn, full);
-  return write_flush_frame(reset).then([this] {
-    return read_main_preamble();
+  return frame_assembler.write_flush_frame(reset).then([this] {
+    return frame_assembler.read_main_preamble();
   }).then([this](auto ret) {
     expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
     return server_connect();
@@ -1511,7 +1476,7 @@ void ProtocolV2::execute_accepting()
           messenger.learned_addr(_my_addr_from_peer, conn);
           return server_auth();
         }).then([this] {
-          return read_main_preamble();
+          return frame_assembler.read_main_preamble();
         }).then([this](auto ret) {
           switch (ret.tag) {
             case Tag::CLIENT_IDENT:
@@ -1559,8 +1524,8 @@ seastar::future<> ProtocolV2::finish_auth()
     auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
   auto sig_frame = AuthSignatureFrame::Encode(sig);
   logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
-  return write_flush_frame(sig_frame).then([this] {
-    return read_main_preamble();
+  return frame_assembler.write_flush_frame(sig_frame).then([this] {
+    return frame_assembler.read_main_preamble();
   }).then([this](auto ret) {
     expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
     return frame_assembler.read_frame_payload();
@@ -1677,7 +1642,7 @@ ProtocolV2::send_server_ident()
                  conn.policy.features_required | msgr2_required,
                  flags, server_cookie);
 
-  return write_flush_frame(server_ident);
+  return frame_assembler.write_flush_frame(server_ident);
 }
 
 // REPLACING state
@@ -1760,7 +1725,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         requeue_out_sent_up_to(new_msg_seq);
         auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
         logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
-        return write_flush_frame(reconnect_ok);
+        return frame_assembler.write_flush_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
         assert(conn.get_peer_type() == new_peer_name.type());
@@ -1805,17 +1770,17 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
 
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(get_buffer(keepalive_frame));
+    bl.append(frame_assembler.get_buffer(keepalive_frame));
   }
 
   if (unlikely(maybe_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
-    bl.append(get_buffer(keepalive_ack_frame));
+    bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
   }
 
   if (require_ack && num_msgs == 0u) {
     auto ack_frame = AckFrame::Encode(get_in_seq());
-    bl.append(get_buffer(ack_frame));
+    bl.append(frame_assembler.get_buffer(ack_frame));
   }
 
   std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
@@ -1843,7 +1808,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
         msg->get_payload(), msg->get_middle(), msg->get_data());
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(get_buffer(message));
+    bl.append(frame_assembler.get_buffer(message));
   });
 
   return bl;
@@ -1983,7 +1948,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
   gate.dispatch_in_background("execute_ready", *this, [this] {
     protocol_timer.cancel();
     return seastar::keep_doing([this] {
-      return read_main_preamble(
+      return frame_assembler.read_main_preamble(
       ).then([this](auto ret) {
         switch (ret.tag) {
           case Tag::MESSAGE: {
index b75c008f0d5e11bd566dc77474fb1ee8b6bd09cf..8b0d098a0fcc9242810f2bd7c3f3d1abf857fe2c 100644 (file)
@@ -166,14 +166,6 @@ class ProtocolV2 final : public Protocol {
   Timer protocol_timer;
 
  private:
-  seastar::future<FrameAssemblerV2::read_main_t> read_main_preamble();
-
-  template <class F>
-  ceph::bufferlist get_buffer(F &tx_frame);
-
-  template <class F>
-  seastar::future<> write_flush_frame(F &tx_frame);
-
   void fault(state_t expected_state,
              const char *where,
              std::exception_ptr eptr);