WRITE
};
+enum class bp_action_t {
+ CONTINUE = 0,
+ FAULT,
+ BLOCK
+};
+
+inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) {
+ static const char *const action_names[] = {"CONTINUE",
+ "FAULT",
+ "BLOCK"};
+ assert(static_cast<size_t>(action) < std::size(action_names));
+ return out << action_names[static_cast<size_t>(action)];
+}
+
+struct socket_trap_t {
+ bp_type_t type;
+ bp_action_t action;
+};
+
+class socket_blocker {
+ std::optional<seastar::promise<>> p_blocked;
+ std::optional<seastar::promise<>> p_unblocked;
+
+ public:
+ seastar::future<> wait_blocked() {
+ ceph_assert(!p_blocked);
+ if (p_unblocked) {
+ return seastar::now();
+ } else {
+ p_blocked = seastar::promise<>();
+ return p_blocked->get_future();
+ }
+ }
+
+ seastar::future<> block() {
+ if (p_blocked) {
+ p_blocked->set_value();
+ p_blocked = std::nullopt;
+ }
+ ceph_assert(!p_unblocked);
+ p_unblocked = seastar::promise<>();
+ return p_unblocked->get_future();
+ }
+
+ void unblock() {
+ ceph_assert(!p_blocked);
+ ceph_assert(p_unblocked);
+ p_unblocked->set_value();
+ p_unblocked = std::nullopt;
+ }
+};
+
struct tag_bp_t {
ceph::msgr::v2::Tag tag;
bp_type_t type;
}
struct Interceptor {
+ socket_blocker blocker;
virtual ~Interceptor() {}
virtual void register_conn(Connection& conn) = 0;
virtual void register_conn_ready(Connection& conn) = 0;
virtual void register_conn_closed(Connection& conn) = 0;
virtual void register_conn_replaced(Connection& conn) = 0;
- virtual bool intercept(Connection& conn, Breakpoint bp) = 0;
+ virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0;
};
} // namespace ceph::net
namespace ceph::net {
#ifdef UNIT_TESTS_BUILT
+void intercept(Breakpoint bp, bp_type_t type,
+ SocketConnection& conn, SocketFRef& socket) {
+ if (conn.interceptor) {
+ auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::FAULT:
+ socket->set_trap({type, action});
+ break;
+ case bp_action_t::BLOCK:
+ socket->set_trap({type, action}, &conn.interceptor->blocker);
+ logger().info("{} blocked at {}, waiting for unblock...", conn, bp);
+ break;
+ default:
+ ceph_abort("unexpected action from intercept");
+ }
+ }
+}
+
+#define INTERCEPT_CUSTOM(bp, type) \
+intercept({bp}, type, conn, socket)
+
+#define INTERCEPT_FRAME(tag, type) \
+intercept({static_cast<Tag>(tag), type}, \
+ type, conn, socket)
-#define INTERCEPT(...) \
-if (conn.interceptor) { \
- if (conn.interceptor->intercept( \
- conn, Breakpoint(__VA_ARGS__))) { \
- abort_in_fault(); \
- } \
+#define INTERCEPT_N_RW(bp) \
+if (conn.interceptor) { \
+ auto action = conn.interceptor->intercept(conn, {bp}); \
+ ceph_assert(action != bp_action_t::BLOCK); \
+ if (action == bp_action_t::FAULT) { \
+ abort_in_fault(); \
+ } \
}
#else
-#define INTERCEPT(...)
+#define INTERCEPT_CUSTOM(bp, type)
+#define INTERCEPT_FRAME(tag, type)
+#define INTERCEPT_N_RW(bp)
#endif
seastar::future<> ProtocolV2::Timer::backoff(double seconds)
rx_segments_desc.emplace_back(main_preamble.segments[idx]);
}
- INTERCEPT(static_cast<Tag>(main_preamble.tag), bp_type_t::READ);
+ INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
return static_cast<Tag>(main_preamble.tag);
});
}
logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
conn, bl.length(), (int)main_preamble->tag,
(int)main_preamble->num_segments, main_preamble->crc);
- INTERCEPT(static_cast<Tag>(main_preamble->tag), bp_type_t::WRITE);
+ INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
if (flush) {
return write_flush(std::move(bl));
} else {
conn, bl.length(), len_payload,
CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
- INTERCEPT(custom_bp_t::BANNER_WRITE);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
return write_flush(std::move(bl)).then([this] {
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
return read_exactly(banner_len); // or read exactly?
}).then([this] (auto bl) {
- INTERCEPT(custom_bp_t::BANNER_READ);
// 3. process peer banner and read banner_payload
unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
logger().debug("{} RECV({}) banner: \"{}\"",
abort_in_fault();
}
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+ INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
return read(payload_len);
}).then([this] (bufferlist bl) {
- INTERCEPT(custom_bp_t::BANNER_PAYLOAD_READ);
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
uint64_t peer_supported_features;
return sock->close().then([sock = std::move(sock)] {});
});
}
- INTERCEPT(custom_bp_t::SOCKET_CONNECTING);
+ INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
return Socket::connect(conn.peer_addr);
}).then([this](SocketFRef sock) {
logger().debug("{} socket connected", conn);
trigger_state(state_t::ACCEPTING, write_state_t::delay, false);
seastar::with_gate(pending_dispatch, [this] {
return seastar::futurize_apply([this] {
- INTERCEPT(custom_bp_t::SOCKET_ACCEPTED);
+ INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
enable_recording();
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
bl.append(keepalive_frame.get_buffer(session_stream_handlers));
- INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
}
if (unlikely(_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
- INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
}
if (require_ack && !num_msgs) {
auto ack_frame = AckFrame::Encode(conn.in_seq);
bl.append(ack_frame.get_buffer(session_stream_handlers));
- INTERCEPT(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
}
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
bl.append(message.get_buffer(session_stream_handlers));
- INTERCEPT(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
+ INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
});
return bl;
seastar::future<bufferlist> Socket::read(size_t bytes)
{
- if (bytes == 0) {
- return seastar::make_ready_future<bufferlist>();
- }
- r.buffer.clear();
- r.remaining = bytes;
- return in.consume(bufferlist_consumer{r.buffer, r.remaining})
- .then([this] {
+#ifdef UNIT_TESTS_BUILT
+ return try_trap(bp_type_t::READ).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferlist>();
+ }
+ r.buffer.clear();
+ r.remaining = bytes;
+ return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
if (r.remaining) { // throw on short reads
throw std::system_error(make_error_code(error::read_eof));
}
return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
});
+#ifdef UNIT_TESTS_BUILT
+ });
+#endif
}
seastar::future<seastar::temporary_buffer<char>>
Socket::read_exactly(size_t bytes) {
- if (bytes == 0) {
- return seastar::make_ready_future<seastar::temporary_buffer<char>>();
- }
- return in.read_exactly(bytes)
- .then([this](auto buf) {
+#ifdef UNIT_TESTS_BUILT
+ return try_trap(bp_type_t::READ).then([bytes, this] {
+#endif
+ if (bytes == 0) {
+ return seastar::make_ready_future<seastar::temporary_buffer<char>>();
+ }
+ return in.read_exactly(bytes).then([this](auto buf) {
if (buf.empty()) {
throw std::system_error(make_error_code(error::read_eof));
}
return seastar::make_ready_future<tmp_buf>(std::move(buf));
});
+#ifdef UNIT_TESTS_BUILT
+ });
+#endif
}
void Socket::shutdown() {
});
}
+#ifdef UNIT_TESTS_BUILT
+seastar::future<> Socket::try_trap(bp_type_t type) {
+ if (next_trap && next_trap->type == type) {
+ auto action = next_trap->action;
+ next_trap = std::nullopt;
+ switch (action) {
+ case bp_action_t::FAULT:
+ throw std::system_error(make_error_code(ceph::net::error::negotiation_failure));
+ case bp_action_t::BLOCK:
+ ceph_assert(blocker != nullptr);
+ return blocker->block();
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ }
+ return seastar::now();
+}
+#endif
+
} // namespace ceph::net
#include "include/buffer.h"
#include "msg/msg_types.h"
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
namespace ceph::net {
class Socket;
seastar::future<tmp_buf> read_exactly(size_t bytes);
seastar::future<> write(packet&& buf) {
- return out.write(std::move(buf));
+#ifdef UNIT_TESTS_BUILT
+ return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+#endif
+ return out.write(std::move(buf));
+#ifdef UNIT_TESTS_BUILT
+ });
+#endif
}
seastar::future<> flush() {
return out.flush();
}
seastar::future<> write_flush(packet&& buf) {
- return out.write(std::move(buf)).then([this] { return out.flush(); });
+#ifdef UNIT_TESTS_BUILT
+ return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+#endif
+ return out.write(std::move(buf)).then([this] { return out.flush(); });
+#ifdef UNIT_TESTS_BUILT
+ });
+#endif
}
// preemptively disable further reads or writes, can only be shutdown once.
void force_shutdown_out() {
socket.shutdown_output();
}
+
+#ifdef UNIT_TESTS_BUILT
+ private:
+ std::optional<socket_trap_t> next_trap = std::nullopt;
+ socket_blocker* blocker = nullptr;
+ seastar::future<> try_trap(bp_type_t type);
+
+ public:
+ void set_trap(socket_trap_t trap, socket_blocker* blocker_ = nullptr) {
+ ceph_assert(!next_trap);
+ next_trap = trap;
+ blocker = blocker_;
+ }
+#endif
};
} // namespace ceph::net
}
using ceph::msgr::v2::Tag;
+using ceph::net::bp_action_t;
using ceph::net::bp_type_t;
using ceph::net::Breakpoint;
using ceph::net::Connection;
using ConnResults = std::vector<ConnResult>;
struct TestInterceptor : public Interceptor {
- std::map<Breakpoint, std::set<unsigned>> breakpoints;
+ std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
std::map<Breakpoint, counter_t> breakpoints_counter;
std::map<ConnectionRef, unsigned> conns;
ConnResults results;
void make_fault(Breakpoint bp, unsigned round = 1) {
assert(round >= 1);
- breakpoints[bp].insert(round);
+ breakpoints[bp][round] = bp_action_t::FAULT;
+ }
+
+ void make_block(Breakpoint bp, unsigned round = 1) {
+ assert(round >= 1);
+ breakpoints[bp][round] = bp_action_t::BLOCK;
}
ConnResult* find_result(ConnectionRef conn) {
logger().info("[{}] {} {}", result->index, conn, result->state);
}
- bool intercept(Connection& conn, Breakpoint bp) override {
+ bp_action_t intercept(Connection& conn, Breakpoint bp) override {
++breakpoints_counter[bp].counter;
auto result = find_result(conn.shared_from_this());
logger().info("[{}] {} intercepted {}({}) => {}",
result->index, conn, bp,
breakpoints_counter[bp].counter, it_cnt->second);
- return true;
+ return it_cnt->second;
}
}
logger().info("[{}] {} intercepted {}({})",
result->index, conn, bp, breakpoints_counter[bp].counter);
- return false;
+ return bp_action_t::CONTINUE;
}
};
if (it == interceptor.breakpoints_counter.end()) {
throw std::runtime_error(fmt::format("{} was missed", kv.first));
}
- auto expected = *std::max_element(kv.second.begin(), kv.second.end());
+ auto expected = kv.second.rbegin()->first;
if (expected > it->second.counter) {
throw std::runtime_error(fmt::format(
"{} only triggered {} times, not the expected {}",
}
}
+ seastar::future<> wait_blocked() {
+ logger().info("Waiting for blocked...");
+ return interceptor.blocker.wait_blocked();
+ }
+
+ void unblock() {
+ logger().info("Unblock breakpoint");
+ return interceptor.blocker.unblock();
+ }
+
seastar::future<> wait_replaced(unsigned count) {
return wait_ready(0, count, false);
}