std::map<Connection*, unsigned> conns;
ConnResults results;
std::optional<seastar::abort_source> signal;
+ const seastar::shard_id primary_sid;
+
+ TestInterceptor() : primary_sid{seastar::this_shard_id()} {}
- TestInterceptor() = default;
// only used for copy breakpoint configurations
- TestInterceptor(const TestInterceptor& other) {
+ TestInterceptor(const TestInterceptor& other) : primary_sid{other.primary_sid} {
assert(other.breakpoints_counter.empty());
assert(other.conns.empty());
assert(other.results.empty());
breakpoints = other.breakpoints;
assert(!other.signal);
+ assert(seastar::this_shard_id() == primary_sid);
}
void make_fault(Breakpoint bp, unsigned round = 1) {
}
ConnResult* find_result(Connection *conn) {
+ assert(seastar::this_shard_id() == primary_sid);
auto it = conns.find(conn);
if (it == conns.end()) {
return nullptr;
}
seastar::future<> wait() {
+ assert(seastar::this_shard_id() == primary_sid);
assert(!signal);
signal = seastar::abort_source();
return seastar::sleep_abortable(10s, *signal).then([] {
}
void notify() {
+ assert(seastar::this_shard_id() == primary_sid);
if (signal) {
signal->request_abort();
signal = std::nullopt;
seastar::future<bp_action_t>
intercept(Connection &_conn, std::vector<Breakpoint> bps) override {
assert(bps.size() >= 1);
+ Connection *conn = &_conn;
- std::vector<bp_action_t> actions;
- for (const Breakpoint &bp : bps) {
- ++breakpoints_counter[bp].counter;
+ return seastar::smp::submit_to(primary_sid, [conn, bps, this] {
+ std::vector<bp_action_t> actions;
+ for (const Breakpoint &bp : bps) {
+ ++breakpoints_counter[bp].counter;
- auto result = find_result(&*conn);
- if (result == nullptr) {
- logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
- *conn, bp, breakpoints_counter[bp].counter);
- ceph_abort();
- }
+ auto result = find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
+ *conn, bp, breakpoints_counter[bp].counter);
+ ceph_abort();
+ }
- if (bp == custom_bp_t::SOCKET_CONNECTING) {
- ++result->connect_attempts;
- logger().info("[Test] connect_attempts={}", result->connect_attempts);
- } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
- ++result->client_connect_attempts;
- logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
- } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
- ++result->client_reconnect_attempts;
- logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
- } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
- ++result->accept_attempts;
- logger().info("[Test] accept_attempts={}", result->accept_attempts);
- } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
- ++result->server_connect_attempts;
- logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
- } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
- ++result->server_reconnect_attempts;
- logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
- }
+ if (bp == custom_bp_t::SOCKET_CONNECTING) {
+ ++result->connect_attempts;
+ logger().info("[Test] connect_attempts={}", result->connect_attempts);
+ } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
+ ++result->client_connect_attempts;
+ logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
+ } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
+ ++result->client_reconnect_attempts;
+ logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
+ } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
+ ++result->accept_attempts;
+ logger().info("[Test] accept_attempts={}", result->accept_attempts);
+ } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
+ ++result->server_connect_attempts;
+ logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
+ } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+ ++result->server_reconnect_attempts;
+ logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
+ }
- auto it_bp = breakpoints.find(bp);
- if (it_bp != breakpoints.end()) {
- auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
- if (it_cnt != it_bp->second.end()) {
- logger().info("[{}] {} intercepted {}({}) => {}",
- result->index, *conn, bp,
- breakpoints_counter[bp].counter, it_cnt->second);
- actions.emplace_back(it_cnt->second);
- continue;
+ auto it_bp = breakpoints.find(bp);
+ if (it_bp != breakpoints.end()) {
+ auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
+ if (it_cnt != it_bp->second.end()) {
+ logger().info("[{}] {} intercepted {}({}) => {}",
+ result->index, *conn, bp,
+ breakpoints_counter[bp].counter, it_cnt->second);
+ actions.emplace_back(it_cnt->second);
+ continue;
+ }
}
+ logger().info("[{}] {} intercepted {}({})",
+ result->index, *conn, bp, breakpoints_counter[bp].counter);
+ actions.emplace_back(bp_action_t::CONTINUE);
}
- logger().info("[{}] {} intercepted {}({})",
- result->index, *conn, bp, breakpoints_counter[bp].counter);
- actions.emplace_back(bp_action_t::CONTINUE);
- }
- bp_action_t action = bp_action_t::CONTINUE;
- for (bp_action_t &a : actions) {
- if (a != bp_action_t::CONTINUE) {
- if (action == bp_action_t::CONTINUE) {
- action = a;
- } else {
- ceph_abort("got multiple incompatible actions");
+ bp_action_t action = bp_action_t::CONTINUE;
+ for (bp_action_t &a : actions) {
+ if (a != bp_action_t::CONTINUE) {
+ if (action == bp_action_t::CONTINUE) {
+ action = a;
+ } else {
+ ceph_abort("got multiple incompatible actions");
+ }
}
}
- }
- return seastar::make_ready_future<bp_action_t>(action);
+ return seastar::make_ready_future<bp_action_t>(action);
+ });
}
};
unsigned pending_peer_receive = 0;
unsigned pending_receive = 0;
- std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
- auto result = interceptor.find_result(&*conn);
- if (result == nullptr) {
- logger().error("Untracked ms dispatched connection: {}", *conn);
- ceph_abort();
- }
-
- if (tracked_conn != &*conn) {
- logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- } else {
- ceph_assert(result->index == tracked_index);
- }
+ ShardedGates &gates;
+ const seastar::shard_id primary_sid;
+ std::optional<seastar::future<>> ms_dispatch(
+ ConnectionRef conn_ref, MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
- ceph_assert(pending_receive > 0);
- --pending_receive;
- if (pending_receive == 0) {
- interceptor.notify();
- }
- logger().info("[Test] got op, left {} ops -- [{}] {}",
- pending_receive, result->index, *conn);
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked ms dispatched connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ }
+
+ ceph_assert(pending_receive > 0);
+ --pending_receive;
+ if (pending_receive == 0) {
+ interceptor.notify();
+ }
+ logger().info("[Test] got op, left {} ops -- [{}] {}",
+ pending_receive, result->index, *conn);
+ }).then([conn_ref] {});
+ });
return {seastar::now()};
}
void ms_handle_accept(
- ConnectionRef conn,
+ ConnectionRef conn_ref,
seastar::shard_id prv_shard,
bool is_replace) override {
- assert(prv_shard == seastar::this_shard_id());
- auto result = interceptor.find_result(&*conn);
- if (result == nullptr) {
- logger().error("Untracked accepted connection: {}", *conn);
- ceph_abort();
- }
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked accepted connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn &&
- !tracked_conn->is_protocol_closed() &&
- tracked_conn != &*conn) {
- logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
- }
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ ceph_abort();
+ }
- tracked_index = result->index;
- tracked_conn = &*conn;
- ++result->cnt_accept_dispatched;
- logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
- result->cnt_accept_dispatched, result->index, *conn);
- std::ignore = flush_pending_send();
+ tracked_index = result->index;
+ tracked_conn = &*conn;
+ ++result->cnt_accept_dispatched;
+ logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
+ result->cnt_accept_dispatched, result->index, *conn);
+ return flush_pending_send();
+ }).then([conn_ref] {});
+ });
}
void ms_handle_connect(
- ConnectionRef conn,
+ ConnectionRef conn_ref,
seastar::shard_id prv_shard) override {
- assert(prv_shard == seastar::this_shard_id());
- auto result = interceptor.find_result(&*conn);
- if (result == nullptr) {
- logger().error("Untracked connected connection: {}", *conn);
- ceph_abort();
- }
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked connected connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn &&
- !tracked_conn->is_protocol_closed() &&
- tracked_conn != &*conn) {
- logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
- }
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ ceph_abort();
+ }
- if (tracked_conn == &*conn) {
- ceph_assert(result->index == tracked_index);
- }
+ if (tracked_conn == &*conn) {
+ ceph_assert(result->index == tracked_index);
+ }
- ++result->cnt_connect_dispatched;
- logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
- result->cnt_connect_dispatched, result->index, *conn);
+ ++result->cnt_connect_dispatched;
+ logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
+ result->cnt_connect_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
}
- void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
- auto result = interceptor.find_result(&*conn);
- if (result == nullptr) {
- logger().error("Untracked reset connection: {}", *conn);
- ceph_abort();
- }
+ void ms_handle_reset(
+ ConnectionRef conn_ref,
+ bool is_replace) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked reset connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn != &*conn) {
- logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- } else {
- ceph_assert(result->index == tracked_index);
- tracked_index = 0;
- tracked_conn = nullptr;
- }
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ tracked_index = 0;
+ tracked_conn = nullptr;
+ }
- ++result->cnt_reset_dispatched;
- logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
- result->cnt_reset_dispatched, result->index, *conn);
+ ++result->cnt_reset_dispatched;
+ logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
+ result->cnt_reset_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
}
- void ms_handle_remote_reset(ConnectionRef conn) override {
- auto result = interceptor.find_result(&*conn);
- if (result == nullptr) {
- logger().error("Untracked remotely reset connection: {}", *conn);
- ceph_abort();
- }
+ void ms_handle_remote_reset(
+ ConnectionRef conn_ref) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked remotely reset connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn != &*conn) {
- logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- } else {
- ceph_assert(result->index == tracked_index);
- }
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ }
- ++result->cnt_remote_reset_dispatched;
- logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
- result->cnt_remote_reset_dispatched, result->index, *conn);
+ ++result->cnt_remote_reset_dispatched;
+ logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
+ result->cnt_remote_reset_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
}
private:
seastar::future<> wait_ready(unsigned num_ready_conns,
unsigned num_replaced,
bool wait_received) {
+ assert(seastar::this_shard_id() == primary_sid);
unsigned pending_conns = 0;
unsigned pending_establish = 0;
unsigned replaced_conns = 0;
public:
FailoverSuite(MessengerRef test_msgr,
entity_addr_t test_peer_addr,
- const TestInterceptor& interceptor)
+ const TestInterceptor& interceptor,
+ ShardedGates &gates)
: test_msgr(test_msgr),
test_peer_addr(test_peer_addr),
- interceptor(interceptor) { }
+ interceptor(interceptor),
+ gates{gates},
+ primary_sid{seastar::this_shard_id()} { }
entity_addr_t get_addr() const {
return test_msgr->get_myaddr();
create(entity_addr_t test_addr,
SocketPolicy test_policy,
entity_addr_t test_peer_addr,
- const TestInterceptor& interceptor) {
+ const TestInterceptor& interceptor,
+ ShardedGates &gates) {
auto suite = std::make_unique<FailoverSuite>(
Messenger::create(
entity_name_t::OSD(TEST_OSD),
"Test",
TEST_NONCE,
true),
- test_peer_addr, interceptor);
+ test_peer_addr,
+ interceptor,
+ gates);
return suite->init(test_addr, test_policy
).then([suite = std::move(suite)] () mutable {
return std::move(suite);
public:
seastar::future<> connect_peer() {
logger().info("[Test] connect_peer({})", test_peer_addr);
+ assert(seastar::this_shard_id() == primary_sid);
auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
auto result = interceptor.find_result(&*conn);
ceph_assert(result != nullptr);
}
seastar::future<> send_peer() {
+ assert(seastar::this_shard_id() == primary_sid);
if (tracked_conn) {
logger().info("[Test] send_peer()");
ceph_assert(!tracked_conn->is_protocol_closed());
seastar::future<> keepalive_peer() {
logger().info("[Test] keepalive_peer()");
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
ceph_assert(!tracked_conn->is_protocol_closed());
return tracked_conn->send_keepalive();
seastar::future<> try_send_peer() {
logger().info("[Test] try_send_peer()");
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
ceph_assert(!tracked_conn->is_protocol_closed());
return send_op(false);
seastar::future<> markdown() {
logger().info("[Test] markdown() in 100ms ...");
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
// sleep to propagate potential remaining acks
return seastar::sleep(100ms
).then([this] {
- tracked_conn->mark_down();
+ return seastar::smp::submit_to(
+ tracked_conn->get_shard_id(), [tracked_conn=tracked_conn] {
+ assert(tracked_conn->get_shard_id() == seastar::this_shard_id());
+ tracked_conn->mark_down();
+ });
});
}
seastar::future<> wait_blocked() {
logger().info("[Test] wait_blocked() ...");
+ assert(seastar::this_shard_id() == primary_sid);
return interceptor.blocker.wait_blocked();
}
void unblock() {
logger().info("[Test] unblock()");
+ assert(seastar::this_shard_id() == primary_sid);
return interceptor.blocker.unblock();
}
}
bool is_standby() {
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
return tracked_conn->is_protocol_standby();
}
logger().info("\n\n[{}]", name);
ceph_assert(!test_suite);
SocketPolicy test_policy_ = to_socket_policy(test_policy);
- return FailoverSuite::create(
- test_addr, test_policy_, test_peer_addr, interceptor
- ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
- ceph_assert(suite->get_addr() == test_addr);
- test_suite.swap(suite);
- return start_peer(peer_policy).then([this, f = std::move(f)] {
- return f(*test_suite);
- }).then([this] {
- test_suite->post_check();
- logger().info("\n[SUCCESS]");
- }).handle_exception([this] (auto eptr) {
- logger().info("\n[FAIL: {}]", eptr);
- test_suite->dump_results();
- throw;
- }).then([this] {
- return stop_peer();
- }).then([this] {
- return test_suite->shutdown().then([this] {
- test_suite.reset();
+ return ShardedGates::create(
+ ).then([this, test_policy_, peer_policy, interceptor,
+ f=std::move(f)](auto *gates) mutable {
+ return FailoverSuite::create(
+ test_addr, test_policy_, test_peer_addr, interceptor, *gates
+ ).then([this, peer_policy, f = std::move(f)](auto suite) mutable {
+ ceph_assert(suite->get_addr() == test_addr);
+ test_suite.swap(suite);
+ return start_peer(peer_policy
+ ).then([this, f = std::move(f)] {
+ return f(*test_suite);
+ }).then([this] {
+ test_suite->post_check();
+ logger().info("\n[SUCCESS]");
+ }).handle_exception([this](auto eptr) {
+ logger().info("\n[FAIL: {}]", eptr);
+ test_suite->dump_results();
+ throw;
+ }).then([this] {
+ return stop_peer();
+ }).then([this] {
+ return test_suite->shutdown(
+ ).then([this] {
+ test_suite.reset();
+ });
});
+ }).then([gates] {
+ return gates->close();
});
});
}