struct TestInterceptor : public Interceptor {
std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
std::map<Breakpoint, counter_t> breakpoints_counter;
- std::map<ConnectionRef, unsigned> conns;
+ std::map<Connection*, unsigned> conns;
ConnResults results;
std::optional<seastar::abort_source> signal;
breakpoints[bp][round] = bp_action_t::STALL;
}
- ConnResult* find_result(ConnectionRef conn) {
+ ConnResult* find_result(Connection *conn) {
auto it = conns.find(conn);
if (it == conns.end()) {
return nullptr;
private:
void register_conn(ConnectionRef conn) override {
- auto result = find_result(conn);
+ auto result = find_result(&*conn);
if (result != nullptr) {
logger().error("The connection [{}] {} already exists when register {}",
result->index, *result->conn, *conn);
}
unsigned index = results.size();
results.emplace_back(conn, index);
- conns[conn] = index;
+ conns[&*conn] = index;
notify();
logger().info("[{}] {} new connection registered", index, *conn);
}
void register_conn_closed(ConnectionRef conn) override {
- auto result = find_result(conn);
+ auto result = find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked closed connection: {}", *conn);
ceph_abort();
}
void register_conn_ready(ConnectionRef conn) override {
- auto result = find_result(conn);
+ auto result = find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked ready connection: {}", *conn);
ceph_abort();
}
- ceph_assert(conn->is_connected());
+ ceph_assert(conn->is_protocol_ready());
notify();
logger().info("[{}] {} ready", result->index, *conn);
}
void register_conn_replaced(ConnectionRef conn) override {
- auto result = find_result(conn);
+ auto result = find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked replaced connection: {}", *conn);
ceph_abort();
bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override {
++breakpoints_counter[bp].counter;
- auto result = find_result(conn);
+ auto result = find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
*conn, bp, breakpoints_counter[bp].counter);
TestInterceptor interceptor;
unsigned tracked_index = 0;
- ConnectionRef tracked_conn;
+ Connection *tracked_conn;
unsigned pending_send = 0;
unsigned pending_peer_receive = 0;
unsigned pending_receive = 0;
std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
- auto result = interceptor.find_result(c);
+ auto result = interceptor.find_result(&*c);
if (result == nullptr) {
logger().error("Untracked ms dispatched connection: {}", *c);
ceph_abort();
}
- if (tracked_conn != c) {
+ if (tracked_conn != &*c) {
logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
result->index, *c, tracked_index, *tracked_conn);
ceph_abort();
seastar::shard_id prv_shard,
bool is_replace) override {
assert(prv_shard == seastar::this_shard_id());
- auto result = interceptor.find_result(conn);
+ auto result = interceptor.find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked accepted connection: {}", *conn);
ceph_abort();
}
if (tracked_conn &&
- !tracked_conn->is_closed() &&
+ !tracked_conn->is_protocol_closed() &&
tracked_conn != conn) {
logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
result->index, *conn, tracked_index, *tracked_conn);
ConnectionRef conn,
seastar::shard_id prv_shard) override {
assert(prv_shard == seastar::this_shard_id());
- auto result = interceptor.find_result(conn);
+ auto result = interceptor.find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked connected connection: {}", *conn);
ceph_abort();
}
void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
- auto result = interceptor.find_result(conn);
+ auto result = interceptor.find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked reset connection: {}", *conn);
ceph_abort();
}
void ms_handle_remote_reset(ConnectionRef conn) override {
- auto result = interceptor.find_result(conn);
+ auto result = interceptor.find_result(&*conn);
if (result == nullptr) {
logger().error("Untracked remotely reset connection: {}", *conn);
ceph_abort();
unsigned pending_establish = 0;
unsigned replaced_conns = 0;
for (auto& result : interceptor.results) {
- if (result.conn->is_closed_clean()) {
+ if (result.conn->is_protocol_closed_clean()) {
if (result.state == conn_state_t::replaced) {
++replaced_conns;
}
- } else if (result.conn->is_connected()) {
+ } else if (result.conn->is_protocol_ready()) {
if (tracked_conn != result.conn || tracked_index != result.index) {
throw std::runtime_error(fmt::format(
"The connected connection [{}] {} doesn't"
do_wait = true;
}
}
- if (wait_received &&
- (pending_send || pending_peer_receive || pending_receive)) {
- if (pending_conns || pending_establish) {
- logger().info("[Test] wait_ready(): wait for pending_send={},"
- " pending_peer_receive={}, pending_receive={},"
- " pending {}/{} ready/establish connections ...",
- pending_send, pending_peer_receive, pending_receive,
- pending_conns, pending_establish);
- do_wait = true;
+ if (wait_received) {
+ if (pending_send || pending_peer_receive || pending_receive) {
+ if (pending_conns || pending_establish) {
+ logger().info("[Test] wait_ready(): wait for pending_send={},"
+ " pending_peer_receive={}, pending_receive={},"
+ " pending {}/{} ready/establish connections ...",
+ pending_send, pending_peer_receive, pending_receive,
+ pending_conns, pending_establish);
+ do_wait = true;
+ } else {
+ // If there are pending messages, stop waiting if there are
+ // no longer pending connections.
+ }
+ } else {
+ // Stop waiting if there are no pending messages. Pending connections
+ // should not be important.
}
}
if (num_replaced > 0) {
seastar::future<> connect_peer() {
logger().info("[Test] connect_peer({})", test_peer_addr);
auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
- auto result = interceptor.find_result(conn);
+ auto result = interceptor.find_result(&*conn);
ceph_assert(result != nullptr);
if (tracked_conn) {
- if (tracked_conn->is_closed()) {
+ if (tracked_conn->is_protocol_closed()) {
ceph_assert(tracked_conn != conn);
logger().info("[Test] this is a new session replacing an closed one");
} else {
bool is_standby() {
ceph_assert(tracked_conn);
- return !(tracked_conn->is_connected() || tracked_conn->is_closed());
+ return tracked_conn->is_protocol_standby();
}
};
assert(prv_shard == seastar::this_shard_id());
logger().info("[TestPeer] got accept from Test");
ceph_assert(!tracked_conn ||
- tracked_conn->is_closed() ||
+ tracked_conn->is_protocol_closed() ||
tracked_conn == conn);
tracked_conn = conn;
std::ignore = flush_pending_send();
logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
if (tracked_conn) {
- if (tracked_conn->is_closed()) {
+ if (tracked_conn->is_protocol_closed()) {
ceph_assert(tracked_conn != new_tracked_conn);
logger().info("[TestPeer] this is a new session"
" replacing an closed one");