From 2d2ee4e5dbd4c8ede49dc03bd670f861664c6b75 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 11 Sep 2019 09:39:27 +0800 Subject: [PATCH] test/crimson: implement STALL action for breakpoint Signed-off-by: Yingxin Cheng --- src/crimson/net/Interceptor.h | 11 +++--- src/crimson/net/ProtocolV2.cc | 14 +------- src/crimson/net/Socket.cc | 57 ++++++++++++++++++++++-------- src/crimson/net/Socket.h | 25 +++++++++---- src/test/crimson/test_messenger.cc | 5 +++ 5 files changed, 70 insertions(+), 42 deletions(-) diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h index e76f990f098ad..5e61e1c0261fc 100644 --- a/src/crimson/net/Interceptor.h +++ b/src/crimson/net/Interceptor.h @@ -36,22 +36,19 @@ enum class bp_type_t { enum class bp_action_t { CONTINUE = 0, FAULT, - BLOCK + BLOCK, + STALL }; inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) { static const char *const action_names[] = {"CONTINUE", "FAULT", - "BLOCK"}; + "BLOCK", + "STALL"}; assert(static_cast(action) < std::size(action_names)); return out << action_names[static_cast(action)]; } -struct socket_trap_t { - bp_type_t type; - bp_action_t action; -}; - class socket_blocker { std::optional> p_blocked; std::optional> p_unblocked; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 2509271ef39c9..b936342b52831 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -132,19 +132,7 @@ void intercept(Breakpoint bp, bp_type_t type, SocketConnection& conn, SocketFRef& socket) { if (conn.interceptor) { auto action = conn.interceptor->intercept(conn, Breakpoint(bp)); - switch (action) { - case bp_action_t::CONTINUE: - break; - case bp_action_t::FAULT: - socket->set_trap({type, action}); - break; - case bp_action_t::BLOCK: - socket->set_trap({type, action}, &conn.interceptor->blocker); - logger().info("{} blocked at {}, waiting for unblock...", conn, bp); - break; - default: - ceph_abort("unexpected action from intercept"); - } + socket->set_trap(type, action, &conn.interceptor->blocker); } } diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index 8e00323ea4a19..4f7c09b94ef6c 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -59,7 +59,7 @@ struct bufferlist_consumer { seastar::future Socket::read(size_t bytes) { #ifdef UNIT_TESTS_BUILT - return try_trap(bp_type_t::READ).then([bytes, this] { + return try_trap_pre(next_trap_read).then([bytes, this] { #endif if (bytes == 0) { return seastar::make_ready_future(); @@ -73,6 +73,11 @@ seastar::future Socket::read(size_t bytes) return seastar::make_ready_future(std::move(r.buffer)); }); #ifdef UNIT_TESTS_BUILT + }).then([this] (auto buf) { + return try_trap_post(next_trap_read + ).then([buf = std::move(buf)] () mutable { + return std::move(buf); + }); }); #endif } @@ -80,7 +85,7 @@ seastar::future Socket::read(size_t bytes) seastar::future> Socket::read_exactly(size_t bytes) { #ifdef UNIT_TESTS_BUILT - return try_trap(bp_type_t::READ).then([bytes, this] { + return try_trap_pre(next_trap_read).then([bytes, this] { #endif if (bytes == 0) { return seastar::make_ready_future>(); @@ -92,6 +97,11 @@ Socket::read_exactly(size_t bytes) { return seastar::make_ready_future(std::move(buf)); }); #ifdef UNIT_TESTS_BUILT + }).then([this] (auto buf) { + return try_trap_post(next_trap_read + ).then([buf = std::move(buf)] () mutable { + return std::move(buf); + }); }); #endif } @@ -127,19 +137,36 @@ seastar::future<> Socket::close() { } #ifdef UNIT_TESTS_BUILT -seastar::future<> Socket::try_trap(bp_type_t type) { - if (next_trap && next_trap->type == type) { - auto action = next_trap->action; - next_trap = std::nullopt; - switch (action) { - case bp_action_t::FAULT: - throw std::system_error(make_error_code(ceph::net::error::negotiation_failure)); - case bp_action_t::BLOCK: - ceph_assert(blocker != nullptr); - return blocker->block(); - default: - ceph_abort("unexpected action from trap"); - } +seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { + auto action = trap; + trap = bp_action_t::CONTINUE; + switch (action) { + case bp_action_t::CONTINUE: + break; + case bp_action_t::FAULT: + throw std::system_error(make_error_code(ceph::net::error::negotiation_failure)); + case bp_action_t::BLOCK: + return blocker->block(); + case bp_action_t::STALL: + trap = action; + break; + default: + ceph_abort("unexpected action from trap"); + } + return seastar::now(); +} + +seastar::future<> Socket::try_trap_post(bp_action_t& trap) { + auto action = trap; + trap = bp_action_t::CONTINUE; + switch (action) { + case bp_action_t::CONTINUE: + break; + case bp_action_t::STALL: + shutdown(); + return blocker->block(); + default: + ceph_abort("unexpected action from trap"); } return seastar::now(); } diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 1882e54f5e737..819a842a20e60 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -86,10 +86,12 @@ class Socket seastar::future<> write(packet&& buf) { #ifdef UNIT_TESTS_BUILT - return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable { + return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { #endif return out.write(std::move(buf)); #ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); }); #endif } @@ -98,10 +100,12 @@ class Socket } seastar::future<> write_flush(packet&& buf) { #ifdef UNIT_TESTS_BUILT - return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable { + return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { #endif return out.write(std::move(buf)).then([this] { return out.flush(); }); #ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); }); #endif } @@ -124,15 +128,22 @@ class Socket #ifdef UNIT_TESTS_BUILT private: - std::optional next_trap = std::nullopt; + bp_action_t next_trap_read = bp_action_t::CONTINUE; + bp_action_t next_trap_write = bp_action_t::CONTINUE; socket_blocker* blocker = nullptr; - seastar::future<> try_trap(bp_type_t type); + seastar::future<> try_trap_pre(bp_action_t& trap); + seastar::future<> try_trap_post(bp_action_t& trap); public: - void set_trap(socket_trap_t trap, socket_blocker* blocker_ = nullptr) { - ceph_assert(!next_trap); - next_trap = trap; + void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { blocker = blocker_; + if (type == bp_type_t::READ) { + ceph_assert(next_trap_read == bp_action_t::CONTINUE); + next_trap_read = action; + } else { + ceph_assert(next_trap_write == bp_action_t::CONTINUE); + next_trap_write = action; + } } #endif }; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index fd84e66dc4ed9..ebade08c90780 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -747,6 +747,11 @@ struct TestInterceptor : public Interceptor { breakpoints[bp][round] = bp_action_t::BLOCK; } + void make_stall(Breakpoint bp, unsigned round = 1) { + assert(round >= 1); + breakpoints[bp][round] = bp_action_t::STALL; + } + ConnResult* find_result(ConnectionRef conn) { auto it = conns.find(conn); if (it == conns.end()) { -- 2.39.5