logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
// this is required for the case when this connection is being replaced
- io_handler.requeue_out_sent_up_to(0);
- io_handler.reset_session(false);
+ io_handler.reset_peer_state();
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
void IOHandler::reset_session(bool full)
{
- // reset in
- in_seq = 0;
+ assert(io_state != io_state_t::open);
+ reset_in();
if (full) {
reset_out();
dispatch_remote_reset();
}
}
+void IOHandler::reset_peer_state()
+{
+ assert(io_state != io_state_t::open);
+ reset_in();
+ requeue_out_sent_up_to(0);
+ discard_out_sent();
+}
+
void IOHandler::requeue_out_sent()
{
assert(io_state != io_state_t::open);
requeue_out_sent();
}
+void IOHandler::reset_in()
+{
+ assert(io_state != io_state_t::open);
+ in_seq = 0;
+}
+
void IOHandler::reset_out()
{
assert(io_state != io_state_t::open);
- out_seq = 0;
+ discard_out_sent();
out_pending_msgs.clear();
- out_sent_msgs.clear();
need_keepalive = false;
next_keepalive_ack = std::nullopt;
ack_left = 0;
}
+void IOHandler::discard_out_sent()
+{
+ assert(io_state != io_state_t::open);
+ out_seq = 0;
+ out_sent_msgs.clear();
+}
+
void IOHandler::dispatch_accept()
{
if (io_state == io_state_t::drop) {
void reset_session(bool full);
+ void reset_peer_state();
+
void requeue_out_sent_up_to(seq_num_t seq);
void requeue_out_sent();
next_keepalive_ack.has_value());
}
+ void reset_in();
+
void reset_out();
+ void discard_out_sent();
+
seastar::future<stop_t> try_exit_out_dispatch();
seastar::future<> do_out_dispatch();