enum class bp_action_t {
CONTINUE = 0,
FAULT,
- BLOCK
+ BLOCK,
+ STALL
};
inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) {
static const char *const action_names[] = {"CONTINUE",
"FAULT",
- "BLOCK"};
+ "BLOCK",
+ "STALL"};
assert(static_cast<size_t>(action) < std::size(action_names));
return out << action_names[static_cast<size_t>(action)];
}
-struct socket_trap_t {
- bp_type_t type;
- bp_action_t action;
-};
-
class socket_blocker {
std::optional<seastar::promise<>> p_blocked;
std::optional<seastar::promise<>> p_unblocked;
SocketConnection& conn, SocketFRef& socket) {
if (conn.interceptor) {
auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
- switch (action) {
- case bp_action_t::CONTINUE:
- break;
- case bp_action_t::FAULT:
- socket->set_trap({type, action});
- break;
- case bp_action_t::BLOCK:
- socket->set_trap({type, action}, &conn.interceptor->blocker);
- logger().info("{} blocked at {}, waiting for unblock...", conn, bp);
- break;
- default:
- ceph_abort("unexpected action from intercept");
- }
+ socket->set_trap(type, action, &conn.interceptor->blocker);
}
}
seastar::future<bufferlist> Socket::read(size_t bytes)
{
#ifdef UNIT_TESTS_BUILT
- return try_trap(bp_type_t::READ).then([bytes, this] {
+ return try_trap_pre(next_trap_read).then([bytes, this] {
#endif
if (bytes == 0) {
return seastar::make_ready_future<bufferlist>();
return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
});
#ifdef UNIT_TESTS_BUILT
+ }).then([this] (auto buf) {
+ return try_trap_post(next_trap_read
+ ).then([buf = std::move(buf)] () mutable {
+ return std::move(buf);
+ });
});
#endif
}
seastar::future<seastar::temporary_buffer<char>>
Socket::read_exactly(size_t bytes) {
#ifdef UNIT_TESTS_BUILT
- return try_trap(bp_type_t::READ).then([bytes, this] {
+ return try_trap_pre(next_trap_read).then([bytes, this] {
#endif
if (bytes == 0) {
return seastar::make_ready_future<seastar::temporary_buffer<char>>();
return seastar::make_ready_future<tmp_buf>(std::move(buf));
});
#ifdef UNIT_TESTS_BUILT
+ }).then([this] (auto buf) {
+ return try_trap_post(next_trap_read
+ ).then([buf = std::move(buf)] () mutable {
+ return std::move(buf);
+ });
});
#endif
}
}
#ifdef UNIT_TESTS_BUILT
-seastar::future<> Socket::try_trap(bp_type_t type) {
- if (next_trap && next_trap->type == type) {
- auto action = next_trap->action;
- next_trap = std::nullopt;
- switch (action) {
- case bp_action_t::FAULT:
- throw std::system_error(make_error_code(ceph::net::error::negotiation_failure));
- case bp_action_t::BLOCK:
- ceph_assert(blocker != nullptr);
- return blocker->block();
- default:
- ceph_abort("unexpected action from trap");
- }
+seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::FAULT:
+ throw std::system_error(make_error_code(ceph::net::error::negotiation_failure));
+ case bp_action_t::BLOCK:
+ return blocker->block();
+ case bp_action_t::STALL:
+ trap = action;
+ break;
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ return seastar::now();
+}
+
+seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
+ auto action = trap;
+ trap = bp_action_t::CONTINUE;
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::STALL:
+ shutdown();
+ return blocker->block();
+ default:
+ ceph_abort("unexpected action from trap");
}
return seastar::now();
}
seastar::future<> write(packet&& buf) {
#ifdef UNIT_TESTS_BUILT
- return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+ return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
#endif
return out.write(std::move(buf));
#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
});
#endif
}
}
seastar::future<> write_flush(packet&& buf) {
#ifdef UNIT_TESTS_BUILT
- return try_trap(bp_type_t::WRITE).then([buf = std::move(buf), this] () mutable {
+ return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
#endif
return out.write(std::move(buf)).then([this] { return out.flush(); });
#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
});
#endif
}
#ifdef UNIT_TESTS_BUILT
private:
- std::optional<socket_trap_t> next_trap = std::nullopt;
+ bp_action_t next_trap_read = bp_action_t::CONTINUE;
+ bp_action_t next_trap_write = bp_action_t::CONTINUE;
socket_blocker* blocker = nullptr;
- seastar::future<> try_trap(bp_type_t type);
+ seastar::future<> try_trap_pre(bp_action_t& trap);
+ seastar::future<> try_trap_post(bp_action_t& trap);
public:
- void set_trap(socket_trap_t trap, socket_blocker* blocker_ = nullptr) {
- ceph_assert(!next_trap);
- next_trap = trap;
+ void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
blocker = blocker_;
+ if (type == bp_type_t::READ) {
+ ceph_assert(next_trap_read == bp_action_t::CONTINUE);
+ next_trap_read = action;
+ } else {
+ ceph_assert(next_trap_write == bp_action_t::CONTINUE);
+ next_trap_write = action;
+ }
}
#endif
};
breakpoints[bp][round] = bp_action_t::BLOCK;
}
+ void make_stall(Breakpoint bp, unsigned round = 1) {
+ assert(round >= 1);
+ breakpoints[bp][round] = bp_action_t::STALL;
+ }
+
ConnResult* find_result(ConnectionRef conn) {
auto it = conns.find(conn);
if (it == conns.end()) {