From: Yingxin Cheng Date: Thu, 5 Sep 2019 03:02:02 +0000 (+0800) Subject: crimson/test: intercept V2 Protocol with Interceptor X-Git-Tag: v15.1.0~1621^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=389bd93f65d40cc1a76a005fba29b712fe9fac2d;p=ceph.git crimson/test: intercept V2 Protocol with Interceptor Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 66dbb1dd6690..3a179ee15fec 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -22,6 +22,10 @@ namespace ceph::net { +#ifdef UNIT_TESTS_BUILT +class Interceptor; +#endif + using seq_num_t = uint64_t; class Connection : public seastar::enable_shared_from_this { @@ -44,6 +48,10 @@ class Connection : public seastar::enable_shared_from_this { Connection() {} virtual ~Connection() {} +#ifdef UNIT_TESTS_BUILT + Interceptor *interceptor = nullptr; +#endif + virtual Messenger* get_messenger() const = 0; const entity_addr_t& get_peer_addr() const { return peer_addr; } const entity_name_t& get_peer_name() const { return peer_name; } diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h new file mode 100644 index 000000000000..4bfab745e793 --- /dev/null +++ b/src/crimson/net/Interceptor.h @@ -0,0 +1,99 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "Fwd.h" +#include "msg/async/frames_v2.h" + +namespace ceph::net { + +enum class custom_bp_t { + BANNER_WRITE = 0, + BANNER_READ, + BANNER_PAYLOAD_READ, + SOCKET_CONNECTING, + SOCKET_ACCEPTED +}; +inline const char* get_bp_name(custom_bp_t bp) { + static const char *const bp_names[] = {"BANNER_WRITE", + "BANNER_READ", + "BANNER_PAYLOAD_READ", + "SOCKET_CONNECTING", + "SOCKET_ACCEPTED"}; + assert(static_cast(bp) < std::size(bp_names)); + return bp_names[static_cast(bp)]; +} + +struct tag_bp_t { + ceph::msgr::v2::Tag tag; + bool is_write; + bool operator==(const tag_bp_t& x) const { + return tag == x.tag && is_write == x.is_write; + } + bool operator!=(const tag_bp_t& x) const { return !operator==(x); } + bool operator<(const tag_bp_t& x) const { + return std::tie(tag, is_write) < std::tie(x.tag, x.is_write); + } +}; + +struct Breakpoint { + using var_t = std::variant; + var_t bp; + Breakpoint(custom_bp_t bp) : bp(bp) { } + Breakpoint(ceph::msgr::v2::Tag tag, bool is_write) + : bp(tag_bp_t{tag, is_write}) { } + bool operator==(const Breakpoint& x) const { return bp == x.bp; } + bool operator!=(const Breakpoint& x) const { return !operator==(x); } + bool operator==(const custom_bp_t& x) const { return bp == var_t(x); } + bool operator!=(const custom_bp_t& x) const { return !operator==(x); } + bool operator==(const tag_bp_t& x) const { return bp == var_t(x); } + bool operator!=(const tag_bp_t& x) const { return !operator==(x); } + bool operator<(const Breakpoint& x) const { return bp < x.bp; } +}; + +inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) { + if (auto custom_bp = std::get_if(&bp.bp)) { + return out << get_bp_name(*custom_bp); + } else { + auto tag_bp = std::get(bp.bp); + static const char *const tag_names[] = {"NONE", + "HELLO", + "AUTH_REQUEST", + "AUTH_BAD_METHOD", + "AUTH_REPLY_MORE", + "AUTH_REQUEST_MORE", + "AUTH_DONE", + "AUTH_SIGNATURE", + "CLIENT_IDENT", + "SERVER_IDENT", + "IDENT_MISSING_FEATURES", + "SESSION_RECONNECT", + "SESSION_RESET", + "SESSION_RETRY", + "SESSION_RETRY_GLOBAL", + "SESSION_RECONNECT_OK", + "WAIT", + "MESSAGE", + "KEEPALIVE2", + "KEEPALIVE2_ACK", + "ACK"}; + assert(static_cast(tag_bp.tag) < std::size(tag_names)); + return out << tag_names[static_cast(tag_bp.tag)] + << (tag_bp.is_write ? "_WRITE" : "_READ"); + } +} + +struct Interceptor { + virtual ~Interceptor() {} + virtual void register_conn(Connection& conn) = 0; + virtual void register_conn_ready(Connection& conn) = 0; + virtual void register_conn_closed(Connection& conn) = 0; + virtual void register_conn_replaced(Connection& conn) = 0; + virtual bool intercept(Connection& conn, Breakpoint bp) = 0; +}; + +} // namespace ceph::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index d0c5e5aa71d4..3574036799aa 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -29,6 +29,10 @@ class AuthServer; namespace ceph::net { +#ifdef UNIT_TESTS_BUILT +class Interceptor; +#endif + using Throttle = ceph::thread::Throttle; using SocketPolicy = ceph::net::Policy; @@ -46,6 +50,10 @@ public: {} virtual ~Messenger() {} +#ifdef UNIT_TESTS_BUILT + Interceptor *interceptor = nullptr; +#endif + entity_type_t get_mytype() const { return my_name.type(); } const entity_name_t& get_myname() const { return my_name; } const entity_addrvec_t& get_myaddrs() const { return my_addrs; } diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 9c2c9a67ba1d..15c618a64a5d 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -239,7 +239,8 @@ seastar::future<> Protocol::do_write_dispatch_sweep() } }).handle_exception_type([this] (const std::system_error& e) { if (e.code() != error::broken_pipe && - e.code() != error::connection_reset) { + e.code() != error::connection_reset && + e.code() != error::negotiation_failure) { logger().error("{} do_write_dispatch_sweep(): unexpected error {}", conn, e); ceph_abort(); diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 7f7c52a6c75a..21a9441d3205 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -23,6 +23,10 @@ #include "SocketConnection.h" #include "SocketMessenger.h" +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + using namespace ceph::msgr::v2; namespace { @@ -123,6 +127,20 @@ inline ostream& operator<<( namespace ceph::net { +#ifdef UNIT_TESTS_BUILT + +#define INTERCEPT(...) \ +if (conn.interceptor) { \ + if (conn.interceptor->intercept( \ + conn, Breakpoint(__VA_ARGS__))) { \ + abort_in_fault(); \ + } \ +} + +#else +#define INTERCEPT(...) +#endif + seastar::future<> ProtocolV2::Timer::backoff(double seconds) { logger().warn("{} waiting {} seconds ...", conn, seconds); @@ -299,6 +317,7 @@ seastar::future ProtocolV2::read_main_preamble() rx_segments_desc.emplace_back(main_preamble.segments[idx]); } + INTERCEPT(static_cast(main_preamble.tag), false); return static_cast(main_preamble.tag); }); } @@ -381,6 +400,7 @@ seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", conn, bl.length(), (int)main_preamble->tag, (int)main_preamble->num_segments, main_preamble->crc); + INTERCEPT(static_cast(main_preamble->tag), true); if (flush) { return write_flush(std::move(bl)); } else { @@ -464,11 +484,13 @@ seastar::future ProtocolV2::banner_exchange() conn, bl.length(), len_payload, CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES, CEPH_BANNER_V2_PREFIX); + INTERCEPT(custom_bp_t::BANNER_WRITE); return write_flush(std::move(bl)).then([this] { // 2. read peer banner unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16); return read_exactly(banner_len); // or read exactly? }).then([this] (auto bl) { + INTERCEPT(custom_bp_t::BANNER_READ); // 3. process peer banner and read banner_payload unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); logger().debug("{} RECV({}) banner: \"{}\"", @@ -498,6 +520,7 @@ seastar::future ProtocolV2::banner_exchange() logger().debug("{} GOT banner: payload_len={}", conn, payload_len); return read(payload_len); }).then([this] (bufferlist bl) { + INTERCEPT(custom_bp_t::BANNER_PAYLOAD_READ); // 4. process peer banner_payload and send HelloFrame auto p = bl.cbegin(); uint64_t peer_supported_features; @@ -873,6 +896,7 @@ void ProtocolV2::execute_connecting() return sock->close().then([sock = std::move(sock)] {}); }); } + INTERCEPT(custom_bp_t::SOCKET_CONNECTING); return Socket::connect(conn.peer_addr); }).then([this](SocketFRef sock) { logger().debug("{} socket connected", conn); @@ -1089,6 +1113,11 @@ ProtocolV2::reuse_connection( connection_features, conn_seq, msg_seq); +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + conn.interceptor->register_conn_replaced(conn); + } +#endif // close this connection because all the necessary information is delivered // to the exisiting connection, and jump to error handling code to abort the // current state. @@ -1473,6 +1502,7 @@ void ProtocolV2::execute_accepting() trigger_state(state_t::ACCEPTING, write_state_t::delay, false); seastar::with_gate(pending_dispatch, [this] { return seastar::futurize_apply([this] { + INTERCEPT(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared(); session_stream_handlers = { nullptr, nullptr }; enable_recording(); @@ -1741,16 +1771,19 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( if (unlikely(require_keepalive)) { auto keepalive_frame = KeepAliveFrame::Encode(); bl.append(keepalive_frame.get_buffer(session_stream_handlers)); + INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2, true); } if (unlikely(_keepalive_ack.has_value())) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack); bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); + INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, true); } if (require_ack && !num_msgs) { auto ack_frame = AckFrame::Encode(conn.in_seq); bl.append(ack_frame.get_buffer(session_stream_handlers)); + INTERCEPT(ceph::msgr::v2::Tag::ACK, true); } std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { @@ -1779,6 +1812,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); bl.append(message.get_buffer(session_stream_handlers)); + INTERCEPT(ceph::msgr::v2::Tag::MESSAGE, true); }); return bl; @@ -1876,6 +1910,11 @@ void ProtocolV2::execute_ready() { assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); trigger_state(state_t::READY, write_state_t::open, false); +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + conn.interceptor->register_conn_ready(conn); + } +#endif execution_done = seastar::with_gate(pending_dispatch, [this] { protocol_timer.cancel(); return seastar::keep_doing([this] { @@ -2039,6 +2078,11 @@ void ProtocolV2::trigger_close() protocol_timer.cancel(); trigger_state(state_t::CLOSING, write_state_t::drop, false); +#ifdef UNIT_TESTS_BUILT + if (conn.interceptor) { + conn.interceptor->register_conn_closed(conn); + } +#endif } } // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 4d78cb13dfd9..44735ca8528f 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -19,6 +19,10 @@ #include "ProtocolV2.h" #include "SocketMessenger.h" +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + using namespace ceph::net; SocketConnection::SocketConnection(SocketMessenger& messenger, @@ -32,6 +36,12 @@ SocketConnection::SocketConnection(SocketMessenger& messenger, } else { protocol = std::make_unique(dispatcher, *this, messenger); } +#ifdef UNIT_TESTS_BUILT + if (messenger.interceptor) { + interceptor = messenger.interceptor; + interceptor->register_conn(*this); + } +#endif } SocketConnection::~SocketConnection() {}