TestInterceptor interceptor;
unsigned tracked_index = 0;
- Connection *tracked_conn;
+ Connection *tracked_conn = nullptr;
unsigned pending_send = 0;
unsigned pending_peer_receive = 0;
unsigned pending_receive = 0;
- std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
- auto result = interceptor.find_result(&*c);
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
+ auto result = interceptor.find_result(&*conn);
if (result == nullptr) {
- logger().error("Untracked ms dispatched connection: {}", *c);
+ logger().error("Untracked ms dispatched connection: {}", *conn);
ceph_abort();
}
- if (tracked_conn != &*c) {
- logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
- result->index, *c, tracked_index, *tracked_conn);
- ceph_abort();
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
}
- ceph_assert(result->index == tracked_index);
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
ceph_assert(pending_receive > 0);
interceptor.notify();
}
logger().info("[Test] got op, left {} ops -- [{}] {}",
- pending_receive, result->index, *c);
+ pending_receive, result->index, *conn);
return {seastar::now()};
}
if (tracked_conn &&
!tracked_conn->is_protocol_closed() &&
- tracked_conn != conn) {
- logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
result->index, *conn, tracked_index, *tracked_conn);
ceph_abort();
}
tracked_index = result->index;
- tracked_conn = conn;
+ tracked_conn = &*conn;
++result->cnt_accept_dispatched;
logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
result->cnt_accept_dispatched, result->index, *conn);
ceph_abort();
}
- if (tracked_conn != conn) {
- logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}",
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
result->index, *conn, tracked_index, *tracked_conn);
ceph_abort();
}
- ceph_assert(result->index == tracked_index);
+
+ if (tracked_conn == &*conn) {
+ ceph_assert(result->index == tracked_index);
+ }
++result->cnt_connect_dispatched;
logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
ceph_abort();
}
- if (tracked_conn != conn) {
- logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ tracked_index = 0;
+ tracked_conn = nullptr;
}
- ceph_assert(result->index == tracked_index);
- tracked_index = 0;
- tracked_conn = nullptr;
++result->cnt_reset_dispatched;
logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
result->cnt_reset_dispatched, result->index, *conn);
ceph_abort();
}
- if (tracked_conn != conn) {
- logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
}
- ceph_assert(result->index == tracked_index);
++result->cnt_remote_reset_dispatched;
logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
seastar::future<> send_op(bool expect_reply=true) {
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
if (expect_reply) {
++pending_peer_receive;
}
logger().info("[Test] flush sending {} ops", pending_send);
}
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
return seastar::do_until(
[this] { return pending_send == 0; },
[this] {
++replaced_conns;
}
} else if (result.conn->is_protocol_ready()) {
- if (tracked_conn != result.conn || tracked_index != result.index) {
- throw std::runtime_error(fmt::format(
- "The connected connection [{}] {} doesn't"
- " match the tracked connection [{}] {}",
- result.index, *result.conn, tracked_index, *tracked_conn));
- }
if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
result.state = conn_state_t::established;
} else {
if (tracked_conn) {
if (tracked_conn->is_protocol_closed()) {
- ceph_assert(tracked_conn != conn);
- logger().info("[Test] this is a new session replacing an closed one");
+ logger().info("[Test] this is a new session"
+ " replacing an closed one");
+ ceph_assert(tracked_conn != &*conn);
} else {
- ceph_assert(tracked_index == result->index);
- ceph_assert(tracked_conn == conn);
logger().info("[Test] this is not a new session");
+ ceph_assert(tracked_index == result->index);
+ ceph_assert(tracked_conn == &*conn);
}
} else {
logger().info("[Test] this is a new session");
}
tracked_index = result->index;
- tracked_conn = conn;
+ tracked_conn = &*conn;
return flush_pending_send();
}
seastar::future<> send_peer() {
if (tracked_conn) {
logger().info("[Test] send_peer()");
+ ceph_assert(!tracked_conn->is_protocol_closed());
ceph_assert(!pending_send);
return send_op();
} else {
seastar::future<> keepalive_peer() {
logger().info("[Test] keepalive_peer()");
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
return tracked_conn->send_keepalive();
}
seastar::future<> try_send_peer() {
logger().info("[Test] try_send_peer()");
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
return send_op(false);
}
ConnectionRef tracked_conn;
unsigned pending_send = 0;
- std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
logger().info("[TestPeer] got op from Test");
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
- ceph_assert(tracked_conn == c);
std::ignore = op_callback();
return {seastar::now()};
}
bool is_replace) override {
assert(prv_shard == seastar::this_shard_id());
logger().info("[TestPeer] got accept from Test");
- ceph_assert(!tracked_conn ||
- tracked_conn->is_protocol_closed() ||
- tracked_conn == conn);
+
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != conn) {
+ logger().error("[TestPeer] {} got accepted, but there's already a valid traced_conn {}",
+ *conn, *tracked_conn);
+ }
tracked_conn = conn;
std::ignore = flush_pending_send();
}
void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
logger().info("[TestPeer] got reset from Test");
- ceph_assert(tracked_conn == conn);
- tracked_conn = nullptr;
}
private:
seastar::future<> send_op() {
ceph_assert(tracked_conn);
+ if (tracked_conn->is_protocol_closed()) {
+ logger().error("[TestPeer] send op but the connection is closed -- {}",
+ *tracked_conn);
+ }
+
pg_t pgid;
object_locator_t oloc;
hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
public:
FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
- : peer_msgr(peer_msgr), op_callback(op_callback) { }
+ : peer_msgr(peer_msgr),
+ op_callback(op_callback) { }
seastar::future<> shutdown() {
peer_msgr->stop();
seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
- auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+ auto conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+
if (tracked_conn) {
if (tracked_conn->is_protocol_closed()) {
- ceph_assert(tracked_conn != new_tracked_conn);
logger().info("[TestPeer] this is a new session"
" replacing an closed one");
+ ceph_assert(tracked_conn != conn);
} else {
- ceph_assert(tracked_conn == new_tracked_conn);
logger().info("[TestPeer] this is not a new session");
+ ceph_assert(tracked_conn == conn);
}
} else {
logger().info("[TestPeer] this is a new session");
}
- tracked_conn = new_tracked_conn;
+ tracked_conn = conn;
+
return flush_pending_send();
}
seastar::future<> send_peer() {
if (tracked_conn) {
logger().info("[TestPeer] send_peer()");
+ ceph_assert(!pending_send);
return send_op();
} else {
++pending_send;