ack_left = 0;
}
+void Protocol::dispatch_accept()
+{
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void Protocol::dispatch_connect()
+{
+ dispatchers.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
+void Protocol::dispatch_reset(bool is_replace)
+{
+ dispatchers.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
+}
+
+void Protocol::dispatch_remote_reset()
+{
+ dispatchers.ms_handle_remote_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+}
+
void Protocol::ack_out_sent(seq_num_t seq)
{
if (conn.policy.lossy) { // lossy connections don't keep sent messages
#include "crimson/auth/AuthServer.h"
#include "crimson/common/formatter.h"
-#include "chained_dispatchers.h"
#include "Errors.h"
#include "SocketConnection.h"
#include "SocketMessenger.h"
throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
}
-#define ABORT_IN_CLOSE(dispatch_reset) { \
- do_close(dispatch_reset); \
- abort_protocol(); \
+#define ABORT_IN_CLOSE(is_dispatch_reset) { \
+ do_close(is_dispatch_reset); \
+ abort_protocol(); \
}
inline void expect_tag(const Tag& expected,
client_cookie = generate_client_cookie();
peer_global_seq = 0;
reset_out();
- dispatchers.ms_handle_remote_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ dispatch_remote_reset();
}
}
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie,
io_stat_printer{*this});
- dispatchers.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ dispatch_connect();
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} after ms_handle_connect(), abort",
conn, get_state_name(state));
trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
if (existing_conn) {
static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
- true /* dispatch_reset */, std::move(accept_me));
+ true /* is_dispatch_reset */, std::move(accept_me));
if (unlikely(state != state_t::ESTABLISHING)) {
logger().warn("{} triggered {} during execute_establishing(), "
"the accept event will not be delivered!",
accept_me();
}
- dispatchers.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ dispatch_accept();
if (unlikely(state != state_t::ESTABLISHING)) {
logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
conn, get_state_name(state));
new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
ceph_assert_always(state == state_t::REPLACING);
- dispatchers.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ dispatch_accept();
// state may become CLOSING, close mover.socket and abort later
return wait_exit_io(
).then([this] {
}
void ProtocolV2::do_close(
- bool dispatch_reset,
+ bool is_dispatch_reset,
std::optional<std::function<void()>> f_accept_new)
{
if (closed) {
bool is_replace = f_accept_new ? true : false;
logger().info("{} closing: reset {}, replace {}", conn,
- dispatch_reset ? "yes" : "no",
+ is_dispatch_reset ? "yes" : "no",
is_replace ? "yes" : "no");
/*
auto gate_closed = gate.close();
auto out_closed = close_out();
- if (dispatch_reset) {
- dispatchers.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
- is_replace);
+ if (is_dispatch_reset) {
+ dispatch_reset(is_replace);
}
// asynchronous operations