*/
if (pre_state == state_t::READY) {
- gate.dispatch_in_background("exit_io", *this, [this] {
+ gate.dispatch_in_background("exit_io", conn, [this] {
return wait_io_exit_dispatching(
).then([this](FrameAssemblerV2Ref fa) {
frame_assembler = std::move(fa);
{
ceph_assert_always(!is_socket_valid);
trigger_state(state_t::CONNECTING, out_state_t::delay, false);
- gated_execute("execute_connecting", [this] {
+ gated_execute("execute_connecting", conn, [this] {
global_seq = messenger.get_global_seq();
assert(client_cookie != 0);
if (!conn.policy.lossy && server_cookie != 0) {
} else {
gate.dispatch_in_background(
"replace_socket_connecting",
- *this,
+ conn,
[this, new_socket=std::move(new_socket)]() mutable {
return frame_assembler->replace_shutdown_socket(std::move(new_socket));
}
{
assert(is_socket_valid);
trigger_state(state_t::ACCEPTING, out_state_t::none, false);
- gate.dispatch_in_background("execute_accepting", *this, [this] {
+ gate.dispatch_in_background("execute_accepting", conn, [this] {
return seastar::futurize_invoke([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
abort_protocol();
}
- gated_execute("execute_establishing", [this] {
+ gated_execute("execute_establishing", conn, [this] {
return seastar::futurize_invoke([this] {
return send_server_ident();
}).then([this] {
frame_assembler->shutdown_socket();
is_socket_valid = false;
}
- gate.dispatch_in_background("trigger_replacing", *this,
- [this,
- reconnect,
- do_reset,
- mover = std::move(mover),
- new_auth_meta = std::move(new_auth_meta),
- new_client_cookie, new_peer_name,
- new_conn_features, new_peer_supported_features,
- new_peer_global_seq,
- new_connect_seq, new_msg_seq] () mutable {
+ gate.dispatch_in_background(
+ "trigger_replacing",
+ conn,
+ [this,
+ reconnect,
+ do_reset,
+ mover = std::move(mover),
+ new_auth_meta = std::move(new_auth_meta),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_supported_features,
+ new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
ceph_assert_always(state == state_t::REPLACING);
dispatch_accept();
// state may become CLOSING, close mover.socket and abort later
peer_global_seq = new_peer_global_seq;
gate.dispatch_in_background(
"replace_frame_assembler",
- *this,
+ conn,
[this, mover=std::move(mover)]() mutable {
return frame_assembler->replace_by(std::move(mover));
}
{
ceph_assert_always(!is_socket_valid);
trigger_state(state_t::WAIT, out_state_t::delay, false);
- gated_execute("execute_wait", [this, max_backoff] {
+ gated_execute("execute_wait", conn, [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
backoff = local_conf().get_val<double>("ms_max_backoff");
{
ceph_assert_always(is_socket_valid);
trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
- gated_execute("execute_server_wait", [this] {
+ gated_execute("execute_server_wait", conn, [this] {
return frame_assembler->read_exactly(1
).then([this](auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
});
}
-void ProtocolV2::print_conn(std::ostream& out) const
-{
- out << conn;
-}
-
} // namespace crimson::net
void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) override;
- void print_conn(std::ostream&) const final;
-
private:
void notify_out() override;
seastar::future<> execution_done = seastar::now();
- template <typename Func>
- void gated_execute(const char* what, Func&& func) {
- gate.dispatch_in_background(what, *this, [this, &func] {
+ template <typename Func, typename T>
+ void gated_execute(const char *what, T &who, Func &&func) {
+ gate.dispatch_in_background(what, who, [this, &who, &func] {
if (!execution_done.available()) {
// discard the unready future
gate.dispatch_in_background(
"gated_execute_abandon",
- *this,
+ who,
[fut=std::move(execution_done)]() mutable {
return std::move(fut);
}