]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/test: intercept V2 Protocol with Interceptor
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 5 Sep 2019 03:02:02 +0000 (11:02 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 5 Sep 2019 03:40:02 +0000 (11:40 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Connection.h
src/crimson/net/Interceptor.h [new file with mode: 0644]
src/crimson/net/Messenger.h
src/crimson/net/Protocol.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.cc

index 66dbb1dd6690def4e4991f44149c8633236d96d2..3a179ee15fecbee7fc1e310903ebcdfb1f463593 100644 (file)
 
 namespace ceph::net {
 
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
 using seq_num_t = uint64_t;
 
 class Connection : public seastar::enable_shared_from_this<Connection> {
@@ -44,6 +48,10 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
   Connection() {}
   virtual ~Connection() {}
 
+#ifdef UNIT_TESTS_BUILT
+  Interceptor *interceptor = nullptr;
+#endif
+
   virtual Messenger* get_messenger() const = 0;
   const entity_addr_t& get_peer_addr() const { return peer_addr; }
   const entity_name_t& get_peer_name() const { return peer_name; }
diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h
new file mode 100644 (file)
index 0000000..4bfab74
--- /dev/null
@@ -0,0 +1,99 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <variant>
+#include <seastar/core/sharded.hh>
+
+#include "Fwd.h"
+#include "msg/async/frames_v2.h"
+
+namespace ceph::net {
+
+enum class custom_bp_t {
+  BANNER_WRITE = 0,
+  BANNER_READ,
+  BANNER_PAYLOAD_READ,
+  SOCKET_CONNECTING,
+  SOCKET_ACCEPTED
+};
+inline const char* get_bp_name(custom_bp_t bp) {
+  static const char *const bp_names[] = {"BANNER_WRITE",
+                                         "BANNER_READ",
+                                         "BANNER_PAYLOAD_READ",
+                                         "SOCKET_CONNECTING",
+                                         "SOCKET_ACCEPTED"};
+  assert(static_cast<int>(bp) < std::size(bp_names));
+  return bp_names[static_cast<int>(bp)];
+}
+
+struct tag_bp_t {
+  ceph::msgr::v2::Tag tag;
+  bool is_write;
+  bool operator==(const tag_bp_t& x) const {
+    return tag == x.tag && is_write == x.is_write;
+  }
+  bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+  bool operator<(const tag_bp_t& x) const {
+    return std::tie(tag, is_write) < std::tie(x.tag, x.is_write);
+  }
+};
+
+struct Breakpoint {
+  using var_t = std::variant<custom_bp_t, tag_bp_t>;
+  var_t bp;
+  Breakpoint(custom_bp_t bp) : bp(bp) { }
+  Breakpoint(ceph::msgr::v2::Tag tag, bool is_write)
+    : bp(tag_bp_t{tag, is_write}) { }
+  bool operator==(const Breakpoint& x) const { return bp == x.bp; }
+  bool operator!=(const Breakpoint& x) const { return !operator==(x); }
+  bool operator==(const custom_bp_t& x) const { return bp == var_t(x); }
+  bool operator!=(const custom_bp_t& x) const { return !operator==(x); }
+  bool operator==(const tag_bp_t& x) const { return bp == var_t(x); }
+  bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+  bool operator<(const Breakpoint& x) const { return bp < x.bp; }
+};
+
+inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) {
+  if (auto custom_bp = std::get_if<custom_bp_t>(&bp.bp)) {
+    return out << get_bp_name(*custom_bp);
+  } else {
+    auto tag_bp = std::get<tag_bp_t>(bp.bp);
+    static const char *const tag_names[] = {"NONE",
+                                            "HELLO",
+                                            "AUTH_REQUEST",
+                                            "AUTH_BAD_METHOD",
+                                            "AUTH_REPLY_MORE",
+                                            "AUTH_REQUEST_MORE",
+                                            "AUTH_DONE",
+                                            "AUTH_SIGNATURE",
+                                            "CLIENT_IDENT",
+                                            "SERVER_IDENT",
+                                            "IDENT_MISSING_FEATURES",
+                                            "SESSION_RECONNECT",
+                                            "SESSION_RESET",
+                                            "SESSION_RETRY",
+                                            "SESSION_RETRY_GLOBAL",
+                                            "SESSION_RECONNECT_OK",
+                                            "WAIT",
+                                            "MESSAGE",
+                                            "KEEPALIVE2",
+                                            "KEEPALIVE2_ACK",
+                                            "ACK"};
+    assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names));
+    return out << tag_names[static_cast<size_t>(tag_bp.tag)]
+               << (tag_bp.is_write ? "_WRITE" : "_READ");
+  }
+}
+
+struct Interceptor {
+  virtual ~Interceptor() {}
+  virtual void register_conn(Connection& conn) = 0;
+  virtual void register_conn_ready(Connection& conn) = 0;
+  virtual void register_conn_closed(Connection& conn) = 0;
+  virtual void register_conn_replaced(Connection& conn) = 0;
+  virtual bool intercept(Connection& conn, Breakpoint bp) = 0;
+};
+
+} // namespace ceph::net
index d0c5e5aa71d4f4d371138f73267ca1c7e9b4ccab..3574036799aa519bef21e9bb32c5fa4b4d3d27d9 100644 (file)
@@ -29,6 +29,10 @@ class AuthServer;
 
 namespace ceph::net {
 
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
 using Throttle = ceph::thread::Throttle;
 using SocketPolicy = ceph::net::Policy<Throttle>;
 
@@ -46,6 +50,10 @@ public:
   {}
   virtual ~Messenger() {}
 
+#ifdef UNIT_TESTS_BUILT
+  Interceptor *interceptor = nullptr;
+#endif
+
   entity_type_t get_mytype() const { return my_name.type(); }
   const entity_name_t& get_myname() const { return my_name; }
   const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
index 9c2c9a67ba1d5ee597f9fb4a13a19d7b22ea9706..15c618a64a5d6e93eec4ec12b87e62a4c855e96e 100644 (file)
@@ -239,7 +239,8 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
     }
   }).handle_exception_type([this] (const std::system_error& e) {
     if (e.code() != error::broken_pipe &&
-        e.code() != error::connection_reset) {
+        e.code() != error::connection_reset &&
+        e.code() != error::negotiation_failure) {
       logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
                      conn, e);
       ceph_abort();
index 7f7c52a6c75adf4b28880646db68f07e2862451c..21a9441d3205b7eda9325a0bbef290a75aae42db 100644 (file)
 #include "SocketConnection.h"
 #include "SocketMessenger.h"
 
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
 using namespace ceph::msgr::v2;
 
 namespace {
@@ -123,6 +127,20 @@ inline ostream& operator<<(
 
 namespace ceph::net {
 
+#ifdef UNIT_TESTS_BUILT
+
+#define INTERCEPT(...)                  \
+if (conn.interceptor) {                 \
+  if (conn.interceptor->intercept(      \
+      conn, Breakpoint(__VA_ARGS__))) { \
+    abort_in_fault();                   \
+  }                                     \
+}
+
+#else
+#define INTERCEPT(...)
+#endif
+
 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
 {
   logger().warn("{} waiting {} seconds ...", conn, seconds);
@@ -299,6 +317,7 @@ seastar::future<Tag> ProtocolV2::read_main_preamble()
         rx_segments_desc.emplace_back(main_preamble.segments[idx]);
       }
 
+      INTERCEPT(static_cast<Tag>(main_preamble.tag), false);
       return static_cast<Tag>(main_preamble.tag);
     });
 }
@@ -381,6 +400,7 @@ seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
   logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
                  conn, bl.length(), (int)main_preamble->tag,
                  (int)main_preamble->num_segments, main_preamble->crc);
+  INTERCEPT(static_cast<Tag>(main_preamble->tag), true);
   if (flush) {
     return write_flush(std::move(bl));
   } else {
@@ -464,11 +484,13 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
                  conn, bl.length(), len_payload,
                  CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
+  INTERCEPT(custom_bp_t::BANNER_WRITE);
   return write_flush(std::move(bl)).then([this] {
       // 2. read peer banner
       unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
       return read_exactly(banner_len); // or read exactly?
     }).then([this] (auto bl) {
+      INTERCEPT(custom_bp_t::BANNER_READ);
       // 3. process peer banner and read banner_payload
       unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
       logger().debug("{} RECV({}) banner: \"{}\"",
@@ -498,6 +520,7 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
       logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
       return read(payload_len);
     }).then([this] (bufferlist bl) {
+      INTERCEPT(custom_bp_t::BANNER_PAYLOAD_READ);
       // 4. process peer banner_payload and send HelloFrame
       auto p = bl.cbegin();
       uint64_t peer_supported_features;
@@ -873,6 +896,7 @@ void ProtocolV2::execute_connecting()
               return sock->close().then([sock = std::move(sock)] {});
             });
           }
+          INTERCEPT(custom_bp_t::SOCKET_CONNECTING);
           return Socket::connect(conn.peer_addr);
         }).then([this](SocketFRef sock) {
           logger().debug("{} socket connected", conn);
@@ -1089,6 +1113,11 @@ ProtocolV2::reuse_connection(
                                     connection_features,
                                     conn_seq,
                                     msg_seq);
+#ifdef UNIT_TESTS_BUILT
+  if (conn.interceptor) {
+    conn.interceptor->register_conn_replaced(conn);
+  }
+#endif
   // close this connection because all the necessary information is delivered
   // to the exisiting connection, and jump to error handling code to abort the
   // current state.
@@ -1473,6 +1502,7 @@ void ProtocolV2::execute_accepting()
   trigger_state(state_t::ACCEPTING, write_state_t::delay, false);
   seastar::with_gate(pending_dispatch, [this] {
       return seastar::futurize_apply([this] {
+          INTERCEPT(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
           enable_recording();
@@ -1741,16 +1771,19 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
     bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+    INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2, true);
   }
 
   if (unlikely(_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
     bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+    INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, true);
   }
 
   if (require_ack && !num_msgs) {
     auto ack_frame = AckFrame::Encode(conn.in_seq);
     bl.append(ack_frame.get_buffer(session_stream_handlers));
+    INTERCEPT(ceph::msgr::v2::Tag::ACK, true);
   }
 
   std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
@@ -1779,6 +1812,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
     bl.append(message.get_buffer(session_stream_handlers));
+    INTERCEPT(ceph::msgr::v2::Tag::MESSAGE, true);
   });
 
   return bl;
@@ -1876,6 +1910,11 @@ void ProtocolV2::execute_ready()
 {
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   trigger_state(state_t::READY, write_state_t::open, false);
+#ifdef UNIT_TESTS_BUILT
+  if (conn.interceptor) {
+    conn.interceptor->register_conn_ready(conn);
+  }
+#endif
   execution_done = seastar::with_gate(pending_dispatch, [this] {
     protocol_timer.cancel();
     return seastar::keep_doing([this] {
@@ -2039,6 +2078,11 @@ void ProtocolV2::trigger_close()
   protocol_timer.cancel();
 
   trigger_state(state_t::CLOSING, write_state_t::drop, false);
+#ifdef UNIT_TESTS_BUILT
+  if (conn.interceptor) {
+    conn.interceptor->register_conn_closed(conn);
+  }
+#endif
 }
 
 } // namespace ceph::net
index 4d78cb13dfd9d45989fef96da8ae1183e905e4f3..44735ca8528fde81bc55990f7c8b219d00468bc8 100644 (file)
 #include "ProtocolV2.h"
 #include "SocketMessenger.h"
 
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
 using namespace ceph::net;
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
@@ -32,6 +36,12 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
   } else {
     protocol = std::make_unique<ProtocolV1>(dispatcher, *this, messenger);
   }
+#ifdef UNIT_TESTS_BUILT
+  if (messenger.interceptor) {
+    interceptor = messenger.interceptor;
+    interceptor->register_conn(*this);
+  }
+#endif
 }
 
 SocketConnection::~SocketConnection() {}