]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: rework interceptor to support cross-core interceptions
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 3 Aug 2023 07:26:54 +0000 (15:26 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 23 Aug 2023 04:49:03 +0000 (12:49 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/Interceptor.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h
src/test/crimson/test_messenger.cc

index bb48138a81fb73254a78b592c383628dc2c83388..273a6350d71edb3a29ab0acadfe37dadf493ef5e 100644 (file)
@@ -6,10 +6,6 @@
 #include "Errors.h"
 #include "SocketConnection.h"
 
-#ifdef UNIT_TESTS_BUILT
-#include "Interceptor.h"
-#endif
-
 using ceph::msgr::v2::FrameAssembler;
 using ceph::msgr::v2::FrameError;
 using ceph::msgr::v2::preamble_block_t;
@@ -43,23 +39,23 @@ FrameAssemblerV2::~FrameAssemblerV2()
 
 #ifdef UNIT_TESTS_BUILT
 // should be consistent to intercept() in ProtocolV2.cc
-void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
+seastar::future<> FrameAssemblerV2::intercept_frames(
+    std::vector<Breakpoint> bps,
+    bp_type_t type)
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  if (conn.interceptor) {
-    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
-    // FIXME: doesn't support cross-core
-    auto action = conn.interceptor->intercept(
-        conn.get_local_shared_foreign_from_this(),
-        Breakpoint{tag, type});
-    // tolerate leaking future in tests
-    std::ignore = seastar::smp::submit_to(
+  if (!conn.interceptor) {
+    return seastar::now();
+  }
+  return conn.interceptor->intercept(conn, bps
+  ).then([this, type](bp_action_t action) {
+    return seastar::smp::submit_to(
         socket->get_shard_id(),
         [this, type, action] {
       socket->set_trap(type, action, &conn.interceptor->blocker);
     });
-  }
+  });
 }
 #endif
 
@@ -361,17 +357,22 @@ FrameAssemblerV2::read_main_preamble()
   return read_exactly<may_cross_core>(
     rx_frame_asm.get_preamble_onwire_len()
   ).then([this](auto bptr) {
+    rx_preamble.append(std::move(bptr));
+    Tag tag;
     try {
-      rx_preamble.append(std::move(bptr));
-      const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
-#ifdef UNIT_TESTS_BUILT
-      intercept_frame(tag, false);
-#endif
-      return read_main_t{tag, &rx_frame_asm};
+      tag = rx_frame_asm.disassemble_preamble(rx_preamble);
     } catch (FrameError& e) {
       logger().warn("{} read_main_preamble: {}", conn, e.what());
       throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
     }
+#ifdef UNIT_TESTS_BUILT
+    return intercept_frame(tag, false
+    ).then([this, tag] {
+      return read_main_t{tag, &rx_frame_asm};
+    });
+#else
+    return read_main_t{tag, &rx_frame_asm};
+#endif
   });
 }
 template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<true>();
index e4af653812d7a6bed89afde4f9af075bbd743607..9c89c144e80a1bf69964741be13a894065896fc5 100644 (file)
 #include "crimson/common/gated.h"
 #include "crimson/net/Socket.h"
 
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
 namespace crimson::net {
 
 class SocketConnection;
@@ -131,9 +135,6 @@ public:
   template <class F>
   ceph::bufferlist get_buffer(F &tx_frame) {
     assert(seastar::this_shard_id() == sid);
-#ifdef UNIT_TESTS_BUILT
-    intercept_frame(F::tag, true);
-#endif
     auto bl = tx_frame.get_buffer(tx_frame_asm);
     log_main_preamble(bl);
     return bl;
@@ -143,12 +144,56 @@ public:
   seastar::future<> write_flush_frame(F &tx_frame) {
     assert(seastar::this_shard_id() == sid);
     auto bl = get_buffer(tx_frame);
+#ifdef UNIT_TESTS_BUILT
+    return intercept_frame(F::tag, true
+    ).then([this, bl=std::move(bl)]() mutable {
+      return write_flush<may_cross_core>(std::move(bl));
+    });
+#else
     return write_flush<may_cross_core>(std::move(bl));
+#endif
   }
 
   static FrameAssemblerV2Ref create(SocketConnection &conn);
 
+#ifdef UNIT_TESTS_BUILT
+  seastar::future<> intercept_frames(
+      std::vector<ceph::msgr::v2::Tag> tags,
+      bool is_write) {
+    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    std::vector<Breakpoint> bps;
+    for (auto &tag : tags) {
+      bps.emplace_back(Breakpoint{tag, type});
+    }
+    return intercept_frames(bps, type);
+  }
+
+  seastar::future<> intercept_frame(
+      ceph::msgr::v2::Tag tag,
+      bool is_write) {
+    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    std::vector<Breakpoint> bps;
+    bps.emplace_back(Breakpoint{tag, type});
+    return intercept_frames(bps, type);
+  }
+
+  seastar::future<> intercept_frame(
+      custom_bp_t bp,
+      bool is_write) {
+    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    std::vector<Breakpoint> bps;
+    bps.emplace_back(Breakpoint{bp});
+    return intercept_frames(bps, type);
+  }
+#endif
+
 private:
+#ifdef UNIT_TESTS_BUILT
+  seastar::future<> intercept_frames(
+      std::vector<Breakpoint> bps,
+      bp_type_t type);
+#endif
+
   bool has_socket() const;
 
   SocketFRef move_socket();
@@ -157,10 +202,6 @@ private:
 
   void log_main_preamble(const ceph::bufferlist &bl);
 
-#ifdef UNIT_TESTS_BUILT
-  void intercept_frame(ceph::msgr::v2::Tag, bool is_write);
-#endif
-
   SocketConnection &conn;
 
   SocketFRef socket;
index 764facf7be1099dffb3d5c22e362ce2b9ecb0eb9..921175cbf9417104e170a01ab843ef661cca6fd7 100644 (file)
@@ -120,7 +120,9 @@ struct Interceptor {
   virtual void register_conn_ready(ConnectionRef) = 0;
   virtual void register_conn_closed(ConnectionRef) = 0;
   virtual void register_conn_replaced(ConnectionRef) = 0;
-  virtual bp_action_t intercept(ConnectionRef, Breakpoint bp) = 0;
+
+  virtual seastar::future<bp_action_t>
+  intercept(Connection&, std::vector<Breakpoint> bp) = 0;
 };
 
 } // namespace crimson::net
index 8bb9f7b6821831a0028d7f3f4e4322b1c7508389..0992c74ec390cf01ff57bc6eab4e96520b4e4a0d 100644 (file)
 #include "Errors.h"
 #include "SocketMessenger.h"
 
-#ifdef UNIT_TESTS_BUILT
-#include "Interceptor.h"
-#endif
-
 using namespace ceph::msgr::v2;
 using crimson::common::local_conf;
 
@@ -101,28 +97,6 @@ inline uint64_t generate_client_cookie() {
 
 namespace crimson::net {
 
-#ifdef UNIT_TESTS_BUILT
-// should be consistent to intercept_frame() in FrameAssemblerV2.cc
-void intercept(Breakpoint bp,
-               bp_type_t type,
-               SocketConnection& conn,
-               Interceptor *interceptor,
-               Socket *socket) {
-  if (interceptor) {
-    auto action = interceptor->intercept(
-        conn.get_local_shared_foreign_from_this(),
-        Breakpoint(bp));
-    socket->set_trap(type, action, &interceptor->blocker);
-  }
-}
-
-#define INTERCEPT_CUSTOM(bp, type)       \
-intercept({bp}, type, conn,              \
-          conn.interceptor, conn.socket)
-#else
-#define INTERCEPT_CUSTOM(bp, type)
-#endif
-
 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
 {
   logger().warn("{} waiting {} seconds ...", conn, seconds);
@@ -439,101 +413,121 @@ ProtocolV2::banner_exchange(bool is_connect)
                  CRIMSON_MSGR2_SUPPORTED_FEATURES,
                  CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
-  INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
-  return frame_assembler->write_flush(std::move(bl)).then([this] {
-      // 2. read peer banner
-      unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
-      INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
+#ifdef UNIT_TESTS_BUILT
+  return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true
+  ).then([this, bl=std::move(bl)]() mutable {
+    return frame_assembler->write_flush(std::move(bl));
+  }
+#else
+  return frame_assembler->write_flush(std::move(bl)
+#endif
+  ).then([this] {
+    // 2. read peer banner
+    unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+#ifdef UNIT_TESTS_BUILT
+    return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false
+    ).then([this, banner_len] {
       return frame_assembler->read_exactly(banner_len);
-    }).then([this](auto bptr) {
-      // 3. process peer banner and read banner_payload
-      unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
-      logger().debug("{} RECV({}) banner: \"{}\"",
-                     conn, bptr.length(),
-                     std::string(bptr.c_str(), banner_prefix_len));
-
-      if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
-        if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
-          logger().warn("{} peer is using V1 protocol", conn);
-        } else {
-          logger().warn("{} peer sent bad banner", conn);
-        }
-        abort_in_fault();
+    });
+#else
+    return frame_assembler->read_exactly(banner_len);
+#endif
+  }).then([this](auto bptr) {
+    // 3. process peer banner and read banner_payload
+    unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+    logger().debug("{} RECV({}) banner: \"{}\"",
+                   conn, bptr.length(),
+                   std::string(bptr.c_str(), banner_prefix_len));
+
+    if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+      if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+        logger().warn("{} peer is using V1 protocol", conn);
+      } else {
+        logger().warn("{} peer sent bad banner", conn);
       }
+      abort_in_fault();
+    }
 
-      bptr.set_offset(bptr.offset() + banner_prefix_len);
-      bptr.set_length(bptr.length() - banner_prefix_len);
-      assert(bptr.length() == sizeof(ceph_le16));
-
-      uint16_t payload_len;
-      bufferlist buf;
-      buf.append(std::move(bptr));
-      auto ti = buf.cbegin();
-      try {
-        decode(payload_len, ti);
-      } catch (const buffer::error &e) {
-        logger().warn("{} decode banner payload len failed", conn);
-        abort_in_fault();
-      }
-      logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
-      INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
+    bptr.set_offset(bptr.offset() + banner_prefix_len);
+    bptr.set_length(bptr.length() - banner_prefix_len);
+    assert(bptr.length() == sizeof(ceph_le16));
+
+    uint16_t payload_len;
+    bufferlist buf;
+    buf.append(std::move(bptr));
+    auto ti = buf.cbegin();
+    try {
+      decode(payload_len, ti);
+    } catch (const buffer::error &e) {
+      logger().warn("{} decode banner payload len failed", conn);
+      abort_in_fault();
+    }
+    logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+#ifdef UNIT_TESTS_BUILT
+    return frame_assembler->intercept_frame(
+      custom_bp_t::BANNER_PAYLOAD_READ, false
+    ).then([this, payload_len] {
       return frame_assembler->read(payload_len);
-    }).then([this, is_connect] (bufferlist bl) {
-      // 4. process peer banner_payload and send HelloFrame
-      auto p = bl.cbegin();
-      uint64_t _peer_supported_features;
-      uint64_t _peer_required_features;
-      try {
-        decode(_peer_supported_features, p);
-        decode(_peer_required_features, p);
-      } catch (const buffer::error &e) {
-        logger().warn("{} decode banner payload failed", conn);
-        abort_in_fault();
-      }
-      logger().debug("{} RECV({}) banner features: supported={} required={}",
-                     conn, bl.length(),
-                     _peer_supported_features, _peer_required_features);
-
-      // Check feature bit compatibility
-      uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
-      uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
-      if ((required_features & _peer_supported_features) != required_features) {
-        logger().error("{} peer does not support all required features"
-                       " required={} peer_supported={}",
-                       conn, required_features, _peer_supported_features);
-        ABORT_IN_CLOSE(is_connect);
-      }
-      if ((supported_features & _peer_required_features) != _peer_required_features) {
-        logger().error("{} we do not support all peer required features"
-                       " peer_required={} supported={}",
-                       conn, _peer_required_features, supported_features);
-        ABORT_IN_CLOSE(is_connect);
-      }
-      peer_supported_features = _peer_supported_features;
-      bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-      frame_assembler->set_is_rev1(is_rev1);
-
-      auto hello = HelloFrame::Encode(messenger.get_mytype(),
-                                      conn.target_addr);
-      logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
-                     conn, ceph_entity_type_name(messenger.get_mytype()),
-                     conn.target_addr);
-      return frame_assembler->write_flush_frame(hello);
-    }).then([this] {
-      //5. read peer HelloFrame
-      return frame_assembler->read_main_preamble();
-    }).then([this](auto ret) {
-      expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
-      return frame_assembler->read_frame_payload();
-    }).then([this](auto payload) {
-      // 6. process peer HelloFrame
-      auto hello = HelloFrame::Decode(payload->back());
-      logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
-                     conn, ceph_entity_type_name(hello.entity_type()),
-                     hello.peer_addr());
-      return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
-        std::make_tuple(hello.entity_type(), hello.peer_addr()));
     });
+#else
+    return frame_assembler->read(payload_len);
+#endif
+  }).then([this, is_connect] (bufferlist bl) {
+    // 4. process peer banner_payload and send HelloFrame
+    auto p = bl.cbegin();
+    uint64_t _peer_supported_features;
+    uint64_t _peer_required_features;
+    try {
+      decode(_peer_supported_features, p);
+      decode(_peer_required_features, p);
+    } catch (const buffer::error &e) {
+      logger().warn("{} decode banner payload failed", conn);
+      abort_in_fault();
+    }
+    logger().debug("{} RECV({}) banner features: supported={} required={}",
+                   conn, bl.length(),
+                   _peer_supported_features, _peer_required_features);
+
+    // Check feature bit compatibility
+    uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
+    uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+    if ((required_features & _peer_supported_features) != required_features) {
+      logger().error("{} peer does not support all required features"
+                     " required={} peer_supported={}",
+                     conn, required_features, _peer_supported_features);
+      ABORT_IN_CLOSE(is_connect);
+    }
+    if ((supported_features & _peer_required_features) != _peer_required_features) {
+      logger().error("{} we do not support all peer required features"
+                     " peer_required={} supported={}",
+                     conn, _peer_required_features, supported_features);
+      ABORT_IN_CLOSE(is_connect);
+    }
+    peer_supported_features = _peer_supported_features;
+    bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+    frame_assembler->set_is_rev1(is_rev1);
+
+    auto hello = HelloFrame::Encode(messenger.get_mytype(),
+                                    conn.target_addr);
+    logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+                   conn, ceph_entity_type_name(messenger.get_mytype()),
+                   conn.target_addr);
+    return frame_assembler->write_flush_frame(hello);
+  }).then([this] {
+    //5. read peer HelloFrame
+    return frame_assembler->read_main_preamble();
+  }).then([this](auto ret) {
+    expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+    return frame_assembler->read_frame_payload();
+  }).then([this](auto payload) {
+    // 6. process peer HelloFrame
+    auto hello = HelloFrame::Decode(payload->back());
+    logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+                   conn, ceph_entity_type_name(hello.entity_type()),
+                   hello.peer_addr());
+    return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+      std::make_tuple(hello.entity_type(), hello.peer_addr()));
+  });
 }
 
 // CONNECTING state
@@ -883,177 +877,177 @@ void ProtocolV2::execute_connecting()
   ceph_assert_always(!is_socket_valid);
   trigger_state(state_t::CONNECTING, io_state_t::delay);
   gated_execute("execute_connecting", conn, [this] {
-      global_seq = messenger.get_global_seq();
-      assert(client_cookie != 0);
-      if (!conn.policy.lossy && server_cookie != 0) {
-        ++connect_seq;
-        logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
-                       conn, global_seq, connect_seq);
-      } else { // conn.policy.lossy || server_cookie == 0
-        assert(connect_seq == 0);
-        assert(server_cookie == 0);
-        logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
-      }
-      return wait_exit_io().then([this] {
+    global_seq = messenger.get_global_seq();
+    assert(client_cookie != 0);
+    if (!conn.policy.lossy && server_cookie != 0) {
+      ++connect_seq;
+      logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+                     conn, global_seq, connect_seq);
+    } else { // conn.policy.lossy || server_cookie == 0
+      assert(connect_seq == 0);
+      assert(server_cookie == 0);
+      logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+    }
+    return wait_exit_io().then([this] {
 #ifdef UNIT_TESTS_BUILT
-          // process custom_bp_t::SOCKET_CONNECTING
-          // supports CONTINUE/FAULT/BLOCK
-          if (conn.interceptor) {
-            auto action = conn.interceptor->intercept(
-                conn.get_local_shared_foreign_from_this(),
-                {custom_bp_t::SOCKET_CONNECTING});
-            switch (action) {
-            case bp_action_t::CONTINUE:
-              return seastar::now();
-            case bp_action_t::FAULT:
-              logger().info("[Test] got FAULT");
-              abort_in_fault();
-            case bp_action_t::BLOCK:
-              logger().info("[Test] got BLOCK");
-              return conn.interceptor->blocker.block();
-            default:
-              ceph_abort("unexpected action from trap");
-              return seastar::now();
-            }
-          } else {
-            return seastar::now();
-          }
-        }).then([this] {
-#endif
-          ceph_assert_always(frame_assembler);
-          if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} before Socket::connect()",
-                           conn, get_state_name(state));
-            abort_protocol();
-          }
-          return Socket::connect(conn.peer_addr);
-        }).then([this](SocketRef _new_socket) {
-          logger().debug("{} socket connected", conn);
-          if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} during Socket::connect()",
-                           conn, get_state_name(state));
-            return _new_socket->close().then([sock=std::move(_new_socket)] {
-              abort_protocol();
-            });
-          }
-          SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
-          if (!has_socket) {
-            frame_assembler->set_socket(std::move(new_socket));
-            has_socket = true;
-          } else {
-            gate.dispatch_in_background(
-              "replace_socket_connecting",
-              conn,
-              [this, new_socket=std::move(new_socket)]() mutable {
-                return frame_assembler->replace_shutdown_socket(std::move(new_socket));
-              }
-            );
-          }
-          is_socket_valid = true;
+      // process custom_bp_t::SOCKET_CONNECTING
+      // supports CONTINUE/FAULT/BLOCK
+      if (!conn.interceptor) {
+        return seastar::now();
+      }
+      return conn.interceptor->intercept(
+        conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}}
+      ).then([this](bp_action_t action) {
+        switch (action) {
+        case bp_action_t::CONTINUE:
           return seastar::now();
-        }).then([this] {
-          auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          frame_assembler->reset_handlers();
-          frame_assembler->start_recording();
-          return banner_exchange(true);
-        }).then([this] (auto&& ret) {
-          auto [_peer_type, _my_addr_from_peer] = std::move(ret);
-          if (conn.get_peer_type() != _peer_type) {
-            logger().warn("{} connection peer type does not match what peer advertises {} != {}",
-                          conn, ceph_entity_type_name(conn.get_peer_type()),
-                          ceph_entity_type_name(_peer_type));
-            ABORT_IN_CLOSE(true);
-          }
-          if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} during banner_exchange(), abort",
-                           conn, get_state_name(state));
-            abort_protocol();
-          }
-          frame_assembler->learn_socket_ephemeral_port_as_connector(
-              _my_addr_from_peer.get_port());
-          if (unlikely(_my_addr_from_peer.is_legacy())) {
-            logger().warn("{} peer sent a legacy address for me: {}",
-                          conn, _my_addr_from_peer);
-            throw std::system_error(
-                make_error_code(crimson::net::error::bad_peer_address));
-          }
-          _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
-          messenger.learned_addr(_my_addr_from_peer, conn);
-          return client_auth();
-        }).then([this] {
-          if (server_cookie == 0) {
-            ceph_assert(connect_seq == 0);
-            return client_connect();
-          } else {
-            ceph_assert(connect_seq > 0);
-            return client_reconnect();
+        case bp_action_t::FAULT:
+          logger().info("[Test] got FAULT");
+          abort_in_fault();
+        case bp_action_t::BLOCK:
+          logger().info("[Test] got BLOCK");
+          return conn.interceptor->blocker.block();
+        default:
+          ceph_abort("unexpected action from trap");
+          return seastar::now();
+        }
+      });;
+    }).then([this] {
+#endif
+      ceph_assert_always(frame_assembler);
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} before Socket::connect()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      return Socket::connect(conn.peer_addr);
+    }).then([this](SocketRef _new_socket) {
+      logger().debug("{} socket connected", conn);
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} during Socket::connect()",
+                       conn, get_state_name(state));
+        return _new_socket->close().then([sock=std::move(_new_socket)] {
+          abort_protocol();
+        });
+      }
+      SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
+      if (!has_socket) {
+        frame_assembler->set_socket(std::move(new_socket));
+        has_socket = true;
+      } else {
+        gate.dispatch_in_background(
+          "replace_socket_connecting",
+          conn,
+          [this, new_socket=std::move(new_socket)]() mutable {
+            return frame_assembler->replace_shutdown_socket(std::move(new_socket));
           }
-        }).then([this] (next_step_t next) {
+        );
+      }
+      is_socket_valid = true;
+      return seastar::now();
+    }).then([this] {
+      auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+      frame_assembler->reset_handlers();
+      frame_assembler->start_recording();
+      return banner_exchange(true);
+    }).then([this] (auto&& ret) {
+      auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+      if (conn.get_peer_type() != _peer_type) {
+        logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+                      conn, ceph_entity_type_name(conn.get_peer_type()),
+                      ceph_entity_type_name(_peer_type));
+        ABORT_IN_CLOSE(true);
+      }
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} during banner_exchange(), abort",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      frame_assembler->learn_socket_ephemeral_port_as_connector(
+          _my_addr_from_peer.get_port());
+      if (unlikely(_my_addr_from_peer.is_legacy())) {
+        logger().warn("{} peer sent a legacy address for me: {}",
+                      conn, _my_addr_from_peer);
+        throw std::system_error(
+            make_error_code(crimson::net::error::bad_peer_address));
+      }
+      _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+      messenger.learned_addr(_my_addr_from_peer, conn);
+      return client_auth();
+    }).then([this] {
+      if (server_cookie == 0) {
+        ceph_assert(connect_seq == 0);
+        return client_connect();
+      } else {
+        ceph_assert(connect_seq > 0);
+        return client_reconnect();
+      }
+    }).then([this] (next_step_t next) {
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} at the end of execute_connecting()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      switch (next) {
+       case next_step_t::ready: {
+        if (unlikely(state != state_t::CONNECTING)) {
+          logger().debug("{} triggered {} before dispatch_connect(), abort",
+                         conn, get_state_name(state));
+          abort_protocol();
+        }
+
+        auto cc_seq = crosscore.prepare_submit();
+        // there are 2 hops with dispatch_connect()
+        crosscore.prepare_submit();
+        logger().info("{} connected: gs={}, pgs={}, cs={}, "
+                      "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+                      "send {} IOHandler::dispatch_connect()",
+                      conn, global_seq, peer_global_seq, connect_seq,
+                      client_cookie, server_cookie, io_states,
+                      frame_assembler->get_socket_shard_id(), cc_seq);
+
+        // set io_handler to a new shard
+        auto new_io_shard = frame_assembler->get_socket_shard_id();
+        ConnectionFRef conn_fref = seastar::make_foreign(
+            conn.shared_from_this());
+        ceph_assert_always(!pr_switch_io_shard.has_value());
+        pr_switch_io_shard = seastar::shared_promise<>();
+        return seastar::smp::submit_to(
+            io_handler.get_shard_id(),
+            [this, cc_seq, new_io_shard,
+             conn_fref=std::move(conn_fref)]() mutable {
+          return io_handler.dispatch_connect(
+              cc_seq, new_io_shard, std::move(conn_fref));
+        }).then([this, new_io_shard] {
+          ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+          pr_switch_io_shard->set_value();
+          pr_switch_io_shard = std::nullopt;
+          // user can make changes
+
           if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} at the end of execute_connecting()",
+            logger().debug("{} triggered {} after dispatch_connect(), abort",
                            conn, get_state_name(state));
             abort_protocol();
           }
-          switch (next) {
-           case next_step_t::ready: {
-            if (unlikely(state != state_t::CONNECTING)) {
-              logger().debug("{} triggered {} before dispatch_connect(), abort",
-                             conn, get_state_name(state));
-              abort_protocol();
-            }
-
-            auto cc_seq = crosscore.prepare_submit();
-            // there are 2 hops with dispatch_connect()
-            crosscore.prepare_submit();
-            logger().info("{} connected: gs={}, pgs={}, cs={}, "
-                          "client_cookie={}, server_cookie={}, {}, new_sid={}, "
-                          "send {} IOHandler::dispatch_connect()",
-                          conn, global_seq, peer_global_seq, connect_seq,
-                          client_cookie, server_cookie, io_states,
-                          frame_assembler->get_socket_shard_id(), cc_seq);
-
-            // set io_handler to a new shard
-            auto new_io_shard = frame_assembler->get_socket_shard_id();
-            ConnectionFRef conn_fref = seastar::make_foreign(
-                conn.shared_from_this());
-            ceph_assert_always(!pr_switch_io_shard.has_value());
-            pr_switch_io_shard = seastar::shared_promise<>();
-            return seastar::smp::submit_to(
-                io_handler.get_shard_id(),
-                [this, cc_seq, new_io_shard,
-                 conn_fref=std::move(conn_fref)]() mutable {
-              return io_handler.dispatch_connect(
-                  cc_seq, new_io_shard, std::move(conn_fref));
-            }).then([this, new_io_shard] {
-              ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
-              pr_switch_io_shard->set_value();
-              pr_switch_io_shard = std::nullopt;
-              // user can make changes
-
-              if (unlikely(state != state_t::CONNECTING)) {
-                logger().debug("{} triggered {} after dispatch_connect(), abort",
-                               conn, get_state_name(state));
-                abort_protocol();
-              }
-              execute_ready();
-            });
-           }
-           case next_step_t::wait: {
-            logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
-            ceph_assert_always(is_socket_valid);
-            frame_assembler->shutdown_socket<true>(&gate);
-            is_socket_valid = false;
-            execute_wait(true);
-            return seastar::now();
-           }
-           default: {
-            ceph_abort("impossible next step");
-           }
-          }
-        }).handle_exception([this](std::exception_ptr eptr) {
-          fault(state_t::CONNECTING, "execute_connecting", eptr);
+          execute_ready();
         });
+       }
+       case next_step_t::wait: {
+        logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+        ceph_assert_always(is_socket_valid);
+        frame_assembler->shutdown_socket<true>(&gate);
+        is_socket_valid = false;
+        execute_wait(true);
+        return seastar::now();
+       }
+       default: {
+        ceph_abort("impossible next step");
+       }
+      }
+    }).handle_exception([this](std::exception_ptr eptr) {
+      fault(state_t::CONNECTING, "execute_connecting", eptr);
     });
+  });
 }
 
 // ACCEPTING state
@@ -1629,90 +1623,87 @@ void ProtocolV2::execute_accepting()
   assert(is_socket_valid);
   trigger_state(state_t::ACCEPTING, io_state_t::none);
   gate.dispatch_in_background("execute_accepting", conn, [this] {
-      return seastar::futurize_invoke([this] {
+    return seastar::futurize_invoke([this] {
 #ifdef UNIT_TESTS_BUILT
-          if (conn.interceptor) {
-            auto action = conn.interceptor->intercept(
-                conn.get_local_shared_foreign_from_this(),
-                {custom_bp_t::SOCKET_ACCEPTED});
-            switch (action) {
-            case bp_action_t::CONTINUE:
-              break;
-            case bp_action_t::FAULT:
-              logger().info("[Test] got FAULT");
-              abort_in_fault();
-            default:
-              ceph_abort("unexpected action from trap");
-            }
-          }
-#endif
-          auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          frame_assembler->reset_handlers();
-          frame_assembler->start_recording();
-          return banner_exchange(false);
-        }).then([this] (auto&& ret) {
-          auto [_peer_type, _my_addr_from_peer] = std::move(ret);
-          ceph_assert(conn.get_peer_type() == 0);
-          conn.set_peer_type(_peer_type);
-
-          conn.policy = messenger.get_policy(_peer_type);
-          logger().info("{} UPDATE: peer_type={},"
-                        " policy(lossy={} server={} standby={} resetcheck={})",
-                        conn, ceph_entity_type_name(_peer_type),
-                        conn.policy.lossy, conn.policy.server,
-                        conn.policy.standby, conn.policy.resetcheck);
-          if (!messenger.get_myaddr().is_blank_ip() &&
-              (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
-              messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
-            logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
-                          conn, _my_addr_from_peer, messenger.get_myaddr());
-            throw std::system_error(
-                make_error_code(crimson::net::error::bad_peer_address));
-          }
-          messenger.learned_addr(_my_addr_from_peer, conn);
-          return server_auth();
-        }).then([this] {
-          return frame_assembler->read_main_preamble();
-        }).then([this](auto ret) {
-          switch (ret.tag) {
-            case Tag::CLIENT_IDENT:
-              return server_connect();
-            case Tag::SESSION_RECONNECT:
-              return server_reconnect();
-            default: {
-              unexpected_tag(ret.tag, conn, "post_server_auth");
-              return seastar::make_ready_future<next_step_t>(next_step_t::none);
-            }
-          }
-        }).then([this] (next_step_t next) {
-          switch (next) {
-           case next_step_t::ready:
-            assert(state != state_t::ACCEPTING);
-            break;
-           case next_step_t::wait:
-            if (unlikely(state != state_t::ACCEPTING)) {
-              logger().debug("{} triggered {} at the end of execute_accepting()",
-                             conn, get_state_name(state));
-              abort_protocol();
-            }
-            logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
-            execute_server_wait();
-            break;
-           default:
-            ceph_abort("impossible next step");
-          }
-        }).handle_exception([this](std::exception_ptr eptr) {
-          const char *e_what;
-          try {
-            std::rethrow_exception(eptr);
-          } catch (std::exception &e) {
-            e_what = e.what();
-          }
-          logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
-                        conn, get_state_name(state), e_what);
-          do_close(false);
+      if (conn.interceptor) {
+        // only notify socket accepted
+        gate.dispatch_in_background(
+            "test_intercept_socket_accepted", conn, [this] {
+          return conn.interceptor->intercept(
+            conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}}
+          ).then([](bp_action_t action) {
+            ceph_assert(action == bp_action_t::CONTINUE);
+          });
         });
+      }
+#endif
+      auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+      frame_assembler->reset_handlers();
+      frame_assembler->start_recording();
+      return banner_exchange(false);
+    }).then([this] (auto&& ret) {
+      auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+      ceph_assert(conn.get_peer_type() == 0);
+      conn.set_peer_type(_peer_type);
+
+      conn.policy = messenger.get_policy(_peer_type);
+      logger().info("{} UPDATE: peer_type={},"
+                    " policy(lossy={} server={} standby={} resetcheck={})",
+                    conn, ceph_entity_type_name(_peer_type),
+                    conn.policy.lossy, conn.policy.server,
+                    conn.policy.standby, conn.policy.resetcheck);
+      if (!messenger.get_myaddr().is_blank_ip() &&
+          (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+          messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
+        logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+                      conn, _my_addr_from_peer, messenger.get_myaddr());
+        throw std::system_error(
+            make_error_code(crimson::net::error::bad_peer_address));
+      }
+      messenger.learned_addr(_my_addr_from_peer, conn);
+      return server_auth();
+    }).then([this] {
+      return frame_assembler->read_main_preamble();
+    }).then([this](auto ret) {
+      switch (ret.tag) {
+        case Tag::CLIENT_IDENT:
+          return server_connect();
+        case Tag::SESSION_RECONNECT:
+          return server_reconnect();
+        default: {
+          unexpected_tag(ret.tag, conn, "post_server_auth");
+          return seastar::make_ready_future<next_step_t>(next_step_t::none);
+        }
+      }
+    }).then([this] (next_step_t next) {
+      switch (next) {
+       case next_step_t::ready:
+        assert(state != state_t::ACCEPTING);
+        break;
+       case next_step_t::wait:
+        if (unlikely(state != state_t::ACCEPTING)) {
+          logger().debug("{} triggered {} at the end of execute_accepting()",
+                         conn, get_state_name(state));
+          abort_protocol();
+        }
+        logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+        execute_server_wait();
+        break;
+       default:
+        ceph_abort("impossible next step");
+      }
+    }).handle_exception([this](std::exception_ptr eptr) {
+      const char *e_what;
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        e_what = e.what();
+      }
+      logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+                    conn, get_state_name(state), e_what);
+      do_close(false);
     });
+  });
 }
 
 // CONNECTING or ACCEPTING state
index 15d5509dc160af8e8325b1c8897e507479388718..c414c48e12f8e89f63d5fcb8a736bc4d86e587f9 100644 (file)
@@ -63,7 +63,12 @@ IOHandler::~IOHandler()
   assert(!conn_ref);
 }
 
-ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
+#ifdef UNIT_TESTS_BUILT
+IOHandler::sweep_ret
+#else
+ceph::bufferlist
+#endif
+IOHandler::sweep_out_pending_msgs_to_sent(
   bool require_keepalive,
   std::optional<utime_t> maybe_keepalive_ack,
   bool require_ack)
@@ -71,25 +76,45 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
   std::size_t num_msgs = out_pending_msgs.size();
   ceph::bufferlist bl;
 
+#ifdef UNIT_TESTS_BUILT
+  std::vector<Tag> tags;
+#endif
+
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
     bl.append(frame_assembler->get_buffer(keepalive_frame));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = KeepAliveFrame::tag;
+    tags.push_back(tag);
+#endif
   }
 
   if (unlikely(maybe_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
     bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = KeepAliveFrameAck::tag;
+    tags.push_back(tag);
+#endif
   }
 
   if (require_ack && num_msgs == 0u) {
     auto ack_frame = AckFrame::Encode(in_seq);
     bl.append(frame_assembler->get_buffer(ack_frame));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = AckFrame::tag;
+    tags.push_back(tag);
+#endif
   }
 
   std::for_each(
       out_pending_msgs.begin(),
       out_pending_msgs.begin()+num_msgs,
-      [this, &bl](const MessageFRef& msg) {
+      [this, &bl
+#ifdef UNIT_TESTS_BUILT
+        , &tags
+#endif
+      ](const MessageFRef& msg) {
     // set priority
     msg->get_header().src = conn.messenger.get_myname();
 
@@ -114,6 +139,10 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
     bl.append(frame_assembler->get_buffer(message));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = MessageFrame::tag;
+    tags.push_back(tag);
+#endif
   });
 
   if (!conn.policy.lossy) {
@@ -123,7 +152,12 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
         std::make_move_iterator(out_pending_msgs.end()));
   }
   out_pending_msgs.clear();
+
+#ifdef UNIT_TESTS_BUILT
+  return sweep_ret{std::move(bl), tags};
+#else
   return bl;
+#endif
 }
 
 seastar::future<> IOHandler::send(MessageFRef msg)
@@ -791,9 +825,18 @@ IOHandler::do_out_dispatch(shard_states_t &ctx)
       auto to_ack = ack_left;
       assert(to_ack == 0 || in_seq > 0);
       ack_left = 0;
-      return frame_assembler->write<false>(
-        sweep_out_pending_msgs_to_sent(
-          require_keepalive, maybe_keepalive_ack, to_ack > 0)
+#ifdef UNIT_TESTS_BUILT
+      auto ret = sweep_out_pending_msgs_to_sent(
+          require_keepalive, maybe_keepalive_ack, to_ack > 0);
+      return frame_assembler->intercept_frames(ret.tags, true
+      ).then([this, bl=std::move(ret.bl)]() mutable {
+        return frame_assembler->write<false>(std::move(bl));
+      }
+#else
+      auto bl = sweep_out_pending_msgs_to_sent(
+          require_keepalive, maybe_keepalive_ack, to_ack > 0);
+      return frame_assembler->write<false>(std::move(bl)
+#endif
       ).then([this, &ctx] {
         if (ctx.get_io_state() != io_state_t::open) {
           return frame_assembler->flush<false>(
index 843e565672abe2632bfda25c5174ca68ec625093..f53c2ba646847e4706a925838322b4627fe7a97d 100644 (file)
@@ -3,6 +3,8 @@
 
 #pragma once
 
+#include <vector>
+
 #include <seastar/core/shared_future.hh>
 #include <seastar/util/later.hh>
 
@@ -475,7 +477,16 @@ public:
 
   seastar::future<> do_out_dispatch(shard_states_t &ctx);
 
-  ceph::bufferlist sweep_out_pending_msgs_to_sent(
+#ifdef UNIT_TESTS_BUILT
+  struct sweep_ret {
+    ceph::bufferlist bl;
+    std::vector<ceph::msgr::v2::Tag> tags;
+  };
+  sweep_ret
+#else
+  ceph::bufferlist
+#endif
+  sweep_out_pending_msgs_to_sent(
       bool require_keepalive,
       std::optional<utime_t> maybe_keepalive_ack,
       bool require_ack);
index 8b1ca07edb9206a57e32a7376a77cb5f4f84ac26..1359b5aeb62ea923e6a69462cfc1dd7181cd2848 100644 (file)
@@ -775,49 +775,68 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} {}", result->index, *conn, result->state);
   }
 
-  bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override {
-    ++breakpoints_counter[bp].counter;
+  seastar::future<bp_action_t>
+  intercept(Connection &_conn, std::vector<Breakpoint> bps) override {
+    assert(bps.size() >= 1);
+
+    std::vector<bp_action_t> actions;
+    for (const Breakpoint &bp : bps) {
+      ++breakpoints_counter[bp].counter;
+
+      auto result = find_result(&*conn);
+      if (result == nullptr) {
+        logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
+                       *conn, bp, breakpoints_counter[bp].counter);
+        ceph_abort();
+      }
 
-    auto result = find_result(&*conn);
-    if (result == nullptr) {
-      logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
-                     *conn, bp, breakpoints_counter[bp].counter);
-      ceph_abort();
-    }
+      if (bp == custom_bp_t::SOCKET_CONNECTING) {
+        ++result->connect_attempts;
+        logger().info("[Test] connect_attempts={}", result->connect_attempts);
+      } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
+        ++result->client_connect_attempts;
+        logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
+      } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
+        ++result->client_reconnect_attempts;
+        logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
+      } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
+        ++result->accept_attempts;
+        logger().info("[Test] accept_attempts={}", result->accept_attempts);
+      } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
+        ++result->server_connect_attempts;
+        logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
+      } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+        ++result->server_reconnect_attempts;
+        logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
+      }
 
-    if (bp == custom_bp_t::SOCKET_CONNECTING) {
-      ++result->connect_attempts;
-      logger().info("[Test] connect_attempts={}", result->connect_attempts);
-    } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
-      ++result->client_connect_attempts;
-      logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
-    } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
-      ++result->client_reconnect_attempts;
-      logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
-    } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
-      ++result->accept_attempts;
-      logger().info("[Test] accept_attempts={}", result->accept_attempts);
-    } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
-      ++result->server_connect_attempts;
-      logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
-    } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
-      ++result->server_reconnect_attempts;
-      logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
+      auto it_bp = breakpoints.find(bp);
+      if (it_bp != breakpoints.end()) {
+        auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
+        if (it_cnt != it_bp->second.end()) {
+          logger().info("[{}] {} intercepted {}({}) => {}",
+                        result->index, *conn, bp,
+                        breakpoints_counter[bp].counter, it_cnt->second);
+          actions.emplace_back(it_cnt->second);
+          continue;
+        }
+      }
+      logger().info("[{}] {} intercepted {}({})",
+                    result->index, *conn, bp, breakpoints_counter[bp].counter);
+      actions.emplace_back(bp_action_t::CONTINUE);
     }
 
-    auto it_bp = breakpoints.find(bp);
-    if (it_bp != breakpoints.end()) {
-      auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
-      if (it_cnt != it_bp->second.end()) {
-        logger().info("[{}] {} intercepted {}({}) => {}",
-                      result->index, *conn, bp,
-                      breakpoints_counter[bp].counter, it_cnt->second);
-        return it_cnt->second;
+    bp_action_t action = bp_action_t::CONTINUE;
+    for (bp_action_t &a : actions) {
+      if (a != bp_action_t::CONTINUE) {
+        if (action == bp_action_t::CONTINUE) {
+          action = a;
+        } else {
+          ceph_abort("got multiple incompatible actions");
+        }
       }
     }
-    logger().info("[{}] {} intercepted {}({})",
-                  result->index, *conn, bp, breakpoints_counter[bp].counter);
-    return bp_action_t::CONTINUE;
+    return seastar::make_ready_future<bp_action_t>(action);
   }
 };