--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <variant>
+#include <seastar/core/sharded.hh>
+
+#include "Fwd.h"
+#include "msg/async/frames_v2.h"
+
+namespace ceph::net {
+
+enum class custom_bp_t {
+ BANNER_WRITE = 0,
+ BANNER_READ,
+ BANNER_PAYLOAD_READ,
+ SOCKET_CONNECTING,
+ SOCKET_ACCEPTED
+};
+inline const char* get_bp_name(custom_bp_t bp) {
+ static const char *const bp_names[] = {"BANNER_WRITE",
+ "BANNER_READ",
+ "BANNER_PAYLOAD_READ",
+ "SOCKET_CONNECTING",
+ "SOCKET_ACCEPTED"};
+ assert(static_cast<int>(bp) < std::size(bp_names));
+ return bp_names[static_cast<int>(bp)];
+}
+
+struct tag_bp_t {
+ ceph::msgr::v2::Tag tag;
+ bool is_write;
+ bool operator==(const tag_bp_t& x) const {
+ return tag == x.tag && is_write == x.is_write;
+ }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const tag_bp_t& x) const {
+ return std::tie(tag, is_write) < std::tie(x.tag, x.is_write);
+ }
+};
+
+struct Breakpoint {
+ using var_t = std::variant<custom_bp_t, tag_bp_t>;
+ var_t bp;
+ Breakpoint(custom_bp_t bp) : bp(bp) { }
+ Breakpoint(ceph::msgr::v2::Tag tag, bool is_write)
+ : bp(tag_bp_t{tag, is_write}) { }
+ bool operator==(const Breakpoint& x) const { return bp == x.bp; }
+ bool operator!=(const Breakpoint& x) const { return !operator==(x); }
+ bool operator==(const custom_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const custom_bp_t& x) const { return !operator==(x); }
+ bool operator==(const tag_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const Breakpoint& x) const { return bp < x.bp; }
+};
+
+inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) {
+ if (auto custom_bp = std::get_if<custom_bp_t>(&bp.bp)) {
+ return out << get_bp_name(*custom_bp);
+ } else {
+ auto tag_bp = std::get<tag_bp_t>(bp.bp);
+ static const char *const tag_names[] = {"NONE",
+ "HELLO",
+ "AUTH_REQUEST",
+ "AUTH_BAD_METHOD",
+ "AUTH_REPLY_MORE",
+ "AUTH_REQUEST_MORE",
+ "AUTH_DONE",
+ "AUTH_SIGNATURE",
+ "CLIENT_IDENT",
+ "SERVER_IDENT",
+ "IDENT_MISSING_FEATURES",
+ "SESSION_RECONNECT",
+ "SESSION_RESET",
+ "SESSION_RETRY",
+ "SESSION_RETRY_GLOBAL",
+ "SESSION_RECONNECT_OK",
+ "WAIT",
+ "MESSAGE",
+ "KEEPALIVE2",
+ "KEEPALIVE2_ACK",
+ "ACK"};
+ assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names));
+ return out << tag_names[static_cast<size_t>(tag_bp.tag)]
+ << (tag_bp.is_write ? "_WRITE" : "_READ");
+ }
+}
+
+struct Interceptor {
+ virtual ~Interceptor() {}
+ virtual void register_conn(Connection& conn) = 0;
+ virtual void register_conn_ready(Connection& conn) = 0;
+ virtual void register_conn_closed(Connection& conn) = 0;
+ virtual void register_conn_replaced(Connection& conn) = 0;
+ virtual bool intercept(Connection& conn, Breakpoint bp) = 0;
+};
+
+} // namespace ceph::net
#include "SocketConnection.h"
#include "SocketMessenger.h"
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
using namespace ceph::msgr::v2;
namespace {
namespace ceph::net {
+#ifdef UNIT_TESTS_BUILT
+
+#define INTERCEPT(...) \
+if (conn.interceptor) { \
+ if (conn.interceptor->intercept( \
+ conn, Breakpoint(__VA_ARGS__))) { \
+ abort_in_fault(); \
+ } \
+}
+
+#else
+#define INTERCEPT(...)
+#endif
+
seastar::future<> ProtocolV2::Timer::backoff(double seconds)
{
logger().warn("{} waiting {} seconds ...", conn, seconds);
rx_segments_desc.emplace_back(main_preamble.segments[idx]);
}
+ INTERCEPT(static_cast<Tag>(main_preamble.tag), false);
return static_cast<Tag>(main_preamble.tag);
});
}
logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
conn, bl.length(), (int)main_preamble->tag,
(int)main_preamble->num_segments, main_preamble->crc);
+ INTERCEPT(static_cast<Tag>(main_preamble->tag), true);
if (flush) {
return write_flush(std::move(bl));
} else {
conn, bl.length(), len_payload,
CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
+ INTERCEPT(custom_bp_t::BANNER_WRITE);
return write_flush(std::move(bl)).then([this] {
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
return read_exactly(banner_len); // or read exactly?
}).then([this] (auto bl) {
+ INTERCEPT(custom_bp_t::BANNER_READ);
// 3. process peer banner and read banner_payload
unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
logger().debug("{} RECV({}) banner: \"{}\"",
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
return read(payload_len);
}).then([this] (bufferlist bl) {
+ INTERCEPT(custom_bp_t::BANNER_PAYLOAD_READ);
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
uint64_t peer_supported_features;
return sock->close().then([sock = std::move(sock)] {});
});
}
+ INTERCEPT(custom_bp_t::SOCKET_CONNECTING);
return Socket::connect(conn.peer_addr);
}).then([this](SocketFRef sock) {
logger().debug("{} socket connected", conn);
connection_features,
conn_seq,
msg_seq);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_replaced(conn);
+ }
+#endif
// close this connection because all the necessary information is delivered
// to the exisiting connection, and jump to error handling code to abort the
// current state.
trigger_state(state_t::ACCEPTING, write_state_t::delay, false);
seastar::with_gate(pending_dispatch, [this] {
return seastar::futurize_apply([this] {
+ INTERCEPT(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
enable_recording();
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+ INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2, true);
}
if (unlikely(_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+ INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, true);
}
if (require_ack && !num_msgs) {
auto ack_frame = AckFrame::Encode(conn.in_seq);
bl.append(ack_frame.get_buffer(session_stream_handlers));
+ INTERCEPT(ceph::msgr::v2::Tag::ACK, true);
}
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
bl.append(message.get_buffer(session_stream_handlers));
+ INTERCEPT(ceph::msgr::v2::Tag::MESSAGE, true);
});
return bl;
{
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
trigger_state(state_t::READY, write_state_t::open, false);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_ready(conn);
+ }
+#endif
execution_done = seastar::with_gate(pending_dispatch, [this] {
protocol_timer.cancel();
return seastar::keep_doing([this] {
protocol_timer.cancel();
trigger_state(state_t::CLOSING, write_state_t::drop, false);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(conn);
+ }
+#endif
}
} // namespace ceph::net