From: Yingxin Cheng Date: Fri, 6 Sep 2019 02:56:41 +0000 (+0800) Subject: test/crimson: implement BLOCK action for breakpoint X-Git-Tag: v15.1.0~1515^2~16 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=21451320402aeee767bbabca06d45be6386bc908;p=ceph-ci.git test/crimson: implement BLOCK action for breakpoint Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h index 7a263448cbe..e76f990f098 100644 --- a/src/crimson/net/Interceptor.h +++ b/src/crimson/net/Interceptor.h @@ -33,6 +33,58 @@ enum class bp_type_t { WRITE }; +enum class bp_action_t { + CONTINUE = 0, + FAULT, + BLOCK +}; + +inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) { + static const char *const action_names[] = {"CONTINUE", + "FAULT", + "BLOCK"}; + assert(static_cast(action) < std::size(action_names)); + return out << action_names[static_cast(action)]; +} + +struct socket_trap_t { + bp_type_t type; + bp_action_t action; +}; + +class socket_blocker { + std::optional> p_blocked; + std::optional> p_unblocked; + + public: + seastar::future<> wait_blocked() { + ceph_assert(!p_blocked); + if (p_unblocked) { + return seastar::now(); + } else { + p_blocked = seastar::promise<>(); + return p_blocked->get_future(); + } + } + + seastar::future<> block() { + if (p_blocked) { + p_blocked->set_value(); + p_blocked = std::nullopt; + } + ceph_assert(!p_unblocked); + p_unblocked = seastar::promise<>(); + return p_unblocked->get_future(); + } + + void unblock() { + ceph_assert(!p_blocked); + ceph_assert(p_unblocked); + p_unblocked->set_value(); + p_unblocked = std::nullopt; + } +}; + struct tag_bp_t { ceph::msgr::v2::Tag tag; bp_type_t type; @@ -93,12 +145,13 @@ inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) { } struct Interceptor { + socket_blocker blocker; virtual ~Interceptor() {} virtual void register_conn(Connection& conn) = 0; virtual void register_conn_ready(Connection& conn) = 0; virtual void register_conn_closed(Connection& conn) = 0; virtual void register_conn_replaced(Connection& conn) = 0; - virtual bool intercept(Connection& conn, Breakpoint bp) = 0; + virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0; }; } // namespace ceph::net diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 7931d1642b7..4c62f818276 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -128,17 +128,46 @@ inline ostream& operator<<( namespace ceph::net { #ifdef UNIT_TESTS_BUILT +void intercept(Breakpoint bp, bp_type_t type, + SocketConnection& conn, SocketFRef& socket) { + if (conn.interceptor) { + auto action = conn.interceptor->intercept(conn, Breakpoint(bp)); + switch (action) { + case bp_action_t::CONTINUE: + break; + case bp_action_t::FAULT: + socket->set_trap({type, action}); + break; + case bp_action_t::BLOCK: + socket->set_trap({type, action}, &conn.interceptor->blocker); + logger().info("{} blocked at {}, waiting for unblock...", conn, bp); + break; + default: + ceph_abort("unexpected action from intercept"); + } + } +} + +#define INTERCEPT_CUSTOM(bp, type) \ +intercept({bp}, type, conn, socket) + +#define INTERCEPT_FRAME(tag, type) \ +intercept({static_cast(tag), type}, \ + type, conn, socket) -#define INTERCEPT(...) \ -if (conn.interceptor) { \ - if (conn.interceptor->intercept( \ - conn, Breakpoint(__VA_ARGS__))) { \ - abort_in_fault(); \ - } \ +#define INTERCEPT_N_RW(bp) \ +if (conn.interceptor) { \ + auto action = conn.interceptor->intercept(conn, {bp}); \ + ceph_assert(action != bp_action_t::BLOCK); \ + if (action == bp_action_t::FAULT) { \ + abort_in_fault(); \ + } \ } #else -#define INTERCEPT(...) +#define INTERCEPT_CUSTOM(bp, type) +#define INTERCEPT_FRAME(tag, type) +#define INTERCEPT_N_RW(bp) #endif seastar::future<> ProtocolV2::Timer::backoff(double seconds) @@ -317,7 +346,7 @@ seastar::future ProtocolV2::read_main_preamble() rx_segments_desc.emplace_back(main_preamble.segments[idx]); } - INTERCEPT(static_cast(main_preamble.tag), bp_type_t::READ); + INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ); return static_cast(main_preamble.tag); }); } @@ -400,7 +429,7 @@ seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", conn, bl.length(), (int)main_preamble->tag, (int)main_preamble->num_segments, main_preamble->crc); - INTERCEPT(static_cast(main_preamble->tag), bp_type_t::WRITE); + INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE); if (flush) { return write_flush(std::move(bl)); } else { @@ -492,13 +521,13 @@ seastar::future ProtocolV2::banner_exchange() conn, bl.length(), len_payload, CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES, CEPH_BANNER_V2_PREFIX); - INTERCEPT(custom_bp_t::BANNER_WRITE); + INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE); return write_flush(std::move(bl)).then([this] { // 2. read peer banner unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16); + INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); return read_exactly(banner_len); // or read exactly? }).then([this] (auto bl) { - INTERCEPT(custom_bp_t::BANNER_READ); // 3. process peer banner and read banner_payload unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); logger().debug("{} RECV({}) banner: \"{}\"", @@ -526,9 +555,9 @@ seastar::future ProtocolV2::banner_exchange() abort_in_fault(); } logger().debug("{} GOT banner: payload_len={}", conn, payload_len); + INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); return read(payload_len); }).then([this] (bufferlist bl) { - INTERCEPT(custom_bp_t::BANNER_PAYLOAD_READ); // 4. process peer banner_payload and send HelloFrame auto p = bl.cbegin(); uint64_t peer_supported_features; @@ -904,7 +933,7 @@ void ProtocolV2::execute_connecting() return sock->close().then([sock = std::move(sock)] {}); }); } - INTERCEPT(custom_bp_t::SOCKET_CONNECTING); + INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING); return Socket::connect(conn.peer_addr); }).then([this](SocketFRef sock) { logger().debug("{} socket connected", conn); @@ -1515,7 +1544,7 @@ void ProtocolV2::execute_accepting() trigger_state(state_t::ACCEPTING, write_state_t::delay, false); seastar::with_gate(pending_dispatch, [this] { return seastar::futurize_apply([this] { - INTERCEPT(custom_bp_t::SOCKET_ACCEPTED); + INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared(); session_stream_handlers = { nullptr, nullptr }; enable_recording(); @@ -1794,19 +1823,19 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( if (unlikely(require_keepalive)) { auto keepalive_frame = KeepAliveFrame::Encode(); bl.append(keepalive_frame.get_buffer(session_stream_handlers)); - INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE); } if (unlikely(_keepalive_ack.has_value())) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack); bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); - INTERCEPT(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE); } if (require_ack && !num_msgs) { auto ack_frame = AckFrame::Encode(conn.in_seq); bl.append(ack_frame.get_buffer(session_stream_handlers)); - INTERCEPT(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); } std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { @@ -1835,7 +1864,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); bl.append(message.get_buffer(session_stream_handlers)); - INTERCEPT(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE); + INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE); }); return bl; diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index 763b4e2129c..8e00323ea4a 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -58,32 +58,42 @@ struct bufferlist_consumer { seastar::future Socket::read(size_t bytes) { - if (bytes == 0) { - return seastar::make_ready_future(); - } - r.buffer.clear(); - r.remaining = bytes; - return in.consume(bufferlist_consumer{r.buffer, r.remaining}) - .then([this] { +#ifdef UNIT_TESTS_BUILT + return try_trap(bp_type_t::READ).then([bytes, this] { +#endif + if (bytes == 0) { + return seastar::make_ready_future(); + } + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] { if (r.remaining) { // throw on short reads throw std::system_error(make_error_code(error::read_eof)); } return seastar::make_ready_future(std::move(r.buffer)); }); +#ifdef UNIT_TESTS_BUILT + }); +#endif } seastar::future> Socket::read_exactly(size_t bytes) { - if (bytes == 0) { - return seastar::make_ready_future>(); - } - return in.read_exactly(bytes) - .then([this](auto buf) { +#ifdef UNIT_TESTS_BUILT + return try_trap(bp_type_t::READ).then([bytes, this] { +#endif + if (bytes == 0) { + return seastar::make_ready_future>(); + } + return in.read_exactly(bytes).then([this](auto buf) { if (buf.empty()) { throw std::system_error(make_error_code(error::read_eof)); } return seastar::make_ready_future(std::move(buf)); }); +#ifdef UNIT_TESTS_BUILT + }); +#endif } void Socket::shutdown() { @@ -116,4 +126,23 @@ seastar::future<> Socket::close() { }); } +#ifdef UNIT_TESTS_BUILT +seastar::future<> Socket::try_trap(bp_type_t type) { + if (next_trap && next_trap->type == type) { + auto action = next_trap->action; + next_trap = std::nullopt; + switch (action) { + case bp_action_t::FAULT: + throw std::system_error(make_error_code(ceph::net::error::negotiation_failure)); + case bp_action_t::BLOCK: + ceph_assert(blocker != nullptr); + return blocker->block(); + default: + ceph_abort("unexpected action from trap"); + } + } + return seastar::now(); +} +#endif + } // namespace ceph::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 0bc61516138..1882e54f5e7 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -10,6 +10,10 @@ #include "include/buffer.h" #include "msg/msg_types.h" +#ifdef UNIT_TESTS_BUILT +#include "Interceptor.h" +#endif + namespace ceph::net { class Socket; @@ -81,13 +85,25 @@ class Socket seastar::future read_exactly(size_t bytes); seastar::future<> write(packet&& buf) { - return out.write(std::move(buf)); +#ifdef UNIT_TESTS_BUILT + return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable { +#endif + return out.write(std::move(buf)); +#ifdef UNIT_TESTS_BUILT + }); +#endif } seastar::future<> flush() { return out.flush(); } seastar::future<> write_flush(packet&& buf) { - return out.write(std::move(buf)).then([this] { return out.flush(); }); +#ifdef UNIT_TESTS_BUILT + return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable { +#endif + return out.write(std::move(buf)).then([this] { return out.flush(); }); +#ifdef UNIT_TESTS_BUILT + }); +#endif } // preemptively disable further reads or writes, can only be shutdown once. @@ -105,6 +121,20 @@ class Socket void force_shutdown_out() { socket.shutdown_output(); } + +#ifdef UNIT_TESTS_BUILT + private: + std::optional next_trap = std::nullopt; + socket_blocker* blocker = nullptr; + seastar::future<> try_trap(bp_type_t type); + + public: + void set_trap(socket_trap_t trap, socket_blocker* blocker_ = nullptr) { + ceph_assert(!next_trap); + next_trap = trap; + blocker = blocker_; + } +#endif }; } // namespace ceph::net diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 5a6dbae9ded..1bba72861ce 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -581,6 +581,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) { } using ceph::msgr::v2::Tag; +using ceph::net::bp_action_t; using ceph::net::bp_type_t; using ceph::net::Breakpoint; using ceph::net::Connection; @@ -720,7 +721,7 @@ struct ConnResult { using ConnResults = std::vector; struct TestInterceptor : public Interceptor { - std::map> breakpoints; + std::map> breakpoints; std::map breakpoints_counter; std::map conns; ConnResults results; @@ -738,7 +739,12 @@ struct TestInterceptor : public Interceptor { void make_fault(Breakpoint bp, unsigned round = 1) { assert(round >= 1); - breakpoints[bp].insert(round); + breakpoints[bp][round] = bp_action_t::FAULT; + } + + void make_block(Breakpoint bp, unsigned round = 1) { + assert(round >= 1); + breakpoints[bp][round] = bp_action_t::BLOCK; } ConnResult* find_result(ConnectionRef conn) { @@ -809,7 +815,7 @@ struct TestInterceptor : public Interceptor { logger().info("[{}] {} {}", result->index, conn, result->state); } - bool intercept(Connection& conn, Breakpoint bp) override { + bp_action_t intercept(Connection& conn, Breakpoint bp) override { ++breakpoints_counter[bp].counter; auto result = find_result(conn.shared_from_this()); @@ -840,12 +846,12 @@ struct TestInterceptor : public Interceptor { logger().info("[{}] {} intercepted {}({}) => {}", result->index, conn, bp, breakpoints_counter[bp].counter, it_cnt->second); - return true; + return it_cnt->second; } } logger().info("[{}] {} intercepted {}({})", result->index, conn, bp, breakpoints_counter[bp].counter); - return false; + return bp_action_t::CONTINUE; } }; @@ -1146,7 +1152,7 @@ class FailoverSuite : public Dispatcher { if (it == interceptor.breakpoints_counter.end()) { throw std::runtime_error(fmt::format("{} was missed", kv.first)); } - auto expected = *std::max_element(kv.second.begin(), kv.second.end()); + auto expected = kv.second.rbegin()->first; if (expected > it->second.counter) { throw std::runtime_error(fmt::format( "{} only triggered {} times, not the expected {}", @@ -1202,6 +1208,16 @@ class FailoverSuite : public Dispatcher { } } + seastar::future<> wait_blocked() { + logger().info("Waiting for blocked..."); + return interceptor.blocker.wait_blocked(); + } + + void unblock() { + logger().info("Unblock breakpoint"); + return interceptor.blocker.unblock(); + } + seastar::future<> wait_replaced(unsigned count) { return wait_ready(0, count, false); }