#include "ProtocolV2.h"
#include "include/msgr.h"
+#include "include/random.h"
#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
});
}
+void ProtocolV2::reset_session(bool full)
+{
+ if (full) {
+ server_cookie = 0;
+ connect_seq = 0;
+ conn.in_seq = 0;
+ } else {
+ conn.out_seq = 0;
+ conn.in_seq = 0;
+ client_cookie = 0;
+ server_cookie = 0;
+ connect_seq = 0;
+ peer_global_seq = 0;
+ // TODO:
+ // discard_out_queue();
+ // message_seq = 0;
+ // ack_left = 0;
+ seastar::with_gate(pending_dispatch, [this] {
+ return dispatcher.ms_handle_remote_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_remote_reset caust exception: {}", conn, eptr);
+ });
+ });
+ }
+}
+
seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
{
// 1. prepare and send banner
seastar::future<bool> ProtocolV2::process_wait()
{
-//return read_frame_payload()
-//.then([this] (bufferlist payload) {
-// handle_wait() logic
-// return false;
-//});
- return seastar::make_ready_future<bool>(false);
+ return read_frame_payload()
+ .then([this] {
+ // handle_wait() logic
+ logger().debug("{} received WAIT (connection race)", conn);
+ WaitFrame::Decode(rx_segments_data.back());
+ return false;
+ });
}
seastar::future<bool> ProtocolV2::client_connect()
{
// send_client_ident() logic
- // <prepare and send ClientIdentFrame>
- // return read_main_preamble()
- // .then([this] (Tag tag) {
- // switch (tag) {
- // case Tag::IDENT_MISSING_FEATURES:
- // abort_in_fault();
- // case Tag::WAIT:
- // return process_wait();
- // case Tag::SERVER_IDENT:
- // <handle ServerIdentFrame>
- return seastar::make_ready_future<bool>(true);
- // default: {
- // unexpected_tag(tag, conn, "post_client_connect");
- // }
- // }
- // });
+ if (!conn.policy.lossy && !client_cookie) {
+ client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
+
+ // TODO: get socket address and learn(not supported by seastar)
+ entity_addr_t a;
+ a.u.sa.sa_family = AF_INET;
+ a.set_type(entity_addr_t::TYPE_MSGR2);
+ logger().debug("{} learn from addr {}", conn, a);
+ return messenger.learned_addr(a)
+ .then([this] {
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags |= CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ auto client_ident = ClientIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ conn.target_addr,
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required, flags,
+ client_cookie);
+
+ logger().debug("{} sending identification: addrs={} target={} gid={}"
+ " global_seq={} features_supported={} features_required={}"
+ " flags={} cookie={}",
+ conn, messenger.get_myaddrs(), conn.target_addr,
+ messenger.get_myname().num(), global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, client_cookie);
+ return write_frame(client_ident);
+ }).then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::IDENT_MISSING_FEATURES:
+ return read_frame_payload()
+ .then([this] {
+ // handle_ident_missing_features() logic
+ auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+ logger().error("{} client does not support all server features: {}",
+ conn, ident_missing.features());
+ abort_in_fault();
+ // won't be executed
+ return false;
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SERVER_IDENT:
+ return read_frame_payload()
+ .then([this] {
+ // handle_server_ident() logic
+ auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+ logger().debug("{} received server identification:"
+ " addrs={} gid={} global_seq={}"
+ " features_supported={} features_required={}"
+ " flags={} cookie={}",
+ conn,
+ server_ident.addrs(), server_ident.gid(),
+ server_ident.global_seq(),
+ server_ident.supported_features(),
+ server_ident.required_features(),
+ server_ident.flags(), server_ident.cookie());
+
+ // is this who we intended to talk to?
+ // be a bit forgiving here, since we may be connecting based on addresses parsed out
+ // of mon_host or something.
+ if (!server_ident.addrs().contains(conn.target_addr)) {
+ logger().warn("{} peer identifies as {}, does not include {}",
+ conn, server_ident.addrs(), conn.target_addr);
+ abort_in_fault();
+ }
+
+ server_cookie = server_ident.cookie();
+
+ // TODO: change peer_addr to entity_addrvec_t
+ ceph_assert(conn.peer_addr == server_ident.addrs().front());
+ peer_name = entity_name_t(conn.get_peer_type(), server_ident.gid());
+ conn.set_features(server_ident.supported_features() &
+ conn.policy.features_supported);
+ peer_global_seq = server_ident.global_seq();
+
+ // TODO: lossless policy
+ ceph_assert(server_ident.flags() & CEPH_MSG_CONNECT_LOSSY);
+ conn.policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ // TODO: backoff = utime_t();
+ logger().debug("{} connect success {}, lossy={}, features={}",
+ conn, connect_seq, conn.policy.lossy, conn.features);
+
+ return dispatcher.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_connect caust exception: {}", conn, eptr);
+ });
+ }).then([this] {
+ return true;
+ });
+ default: {
+ unexpected_tag(tag, conn, "post_client_connect");
+ // won't be executed
+ return seastar::make_ready_future<bool>(false);
+ }
+ }
+ });
}
seastar::future<bool> ProtocolV2::client_reconnect()
{
// send_reconnect() logic
- // <prepare ReconnectFrame and send>
- // <then:>
- // return read_main_preamble()
- // .then([this] (Tag tag) {
- // switch (tag) {
- // case Tag::SESSION_RETRY_GLOBAL:
- // <handle RetryGlobalFrame>
- // return client_reconnect();
- // case Tag::SESSION_RETRY:
- // <handle RetryFrame>
- // return client_reconnect();
- // case Tag::SESSION_RESET:
- // <handle ResetFrame>
- // return client_connect();
- // case Tag::WAIT:
- // return process_wait();
- // case Tag::SESSION_RECONNECT_OK:
- // <handle ReconnectOkFrame>
- return seastar::make_ready_future<bool>(true);
- // default: {
- // unexpected_tag(tag, conn, "post_client_reconnect");
- // }
- // }
- // });
+ auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
+ client_cookie,
+ server_cookie,
+ global_seq,
+ connect_seq,
+ conn.in_seq);
+ logger().debug("{} reconnect to session: client_cookie={}"
+ " server_cookie={} gs={} cs={} ms={}",
+ conn, client_cookie, server_cookie,
+ global_seq, connect_seq, conn.in_seq);
+ return write_frame(reconnect)
+ .then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::SESSION_RETRY_GLOBAL:
+ return read_frame_payload()
+ .then([this] {
+ // handle_session_retry_global() logic
+ auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+ global_seq = messenger.get_global_seq(retry.global_seq());
+ logger().warn("{} received session retry global "
+ "global_seq={}, choose new gs={}",
+ conn, retry.global_seq(), global_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RETRY:
+ return read_frame_payload()
+ .then([this] {
+ // handle_session_retry() logic
+ auto retry = RetryFrame::Decode(rx_segments_data.back());
+ connect_seq = retry.connect_seq() + 1;
+ logger().warn("{} received session retry connect_seq={}, inc to cs={}",
+ conn, retry.connect_seq(), connect_seq);
+ return client_reconnect();
+ });
+ case Tag::SESSION_RESET:
+ return read_frame_payload()
+ .then([this] {
+ // handle_session_reset() logic
+ auto reset = ResetFrame::Decode(rx_segments_data.back());
+ logger().warn("{} received session reset full={}", reset.full());
+ reset_session(reset.full());
+ return client_connect();
+ });
+ case Tag::WAIT:
+ return process_wait();
+ case Tag::SESSION_RECONNECT_OK:
+ return read_frame_payload()
+ .then([this] {
+ // handle_reconnect_ok() logic
+ auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+ logger().debug("{} received reconnect ok:"
+ "sms={}, lossy={}, features={}",
+ conn, reconnect_ok.msg_seq(),
+ connect_seq, conn.policy.lossy, conn.features);
+ // TODO
+ // discard_requeued_up_to()
+ // backoff = utime_t();
+ return dispatcher.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_connect caust exception: {}", conn, eptr);
+ });
+ }).then([this] {
+ return true;
+ });
+ default: {
+ unexpected_tag(tag, conn, "post_client_reconnect");
+ // won't be executed
+ return seastar::make_ready_future<bool>(false);
+ }
+ }
+ });
}
void ProtocolV2::execute_connecting()
}).then([this] {
return client_auth();
}).then([this] {
- if (1) { // TODO check connect or reconnect
+ if (!server_cookie) {
+ ceph_assert(connect_seq == 0);
return client_connect();
} else {
+ ceph_assert(connect_seq > 0);
// TODO: lossless policy
ceph_assert(false);
return client_reconnect();
seastar::future<bool> ProtocolV2::send_wait()
{
- // <prepare and send WaitFrame>
- // <then:>
- return seastar::make_ready_future<bool>(false);
+ auto wait = WaitFrame::Encode();
+ return write_frame(wait)
+ .then([this] {
+ return false;
+ });
}
seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
{
+ // handle_existing_connection() logic
+ logger().debug("{} {}: {}", conn, __func__, *existing);
+
+ ProtocolV2 *exproto = dynamic_cast<ProtocolV2*>(existing->protocol.get());
+ ceph_assert(exproto);
+
+ if (exproto->state == state_t::CLOSING) {
+ logger().warn("{} existing {} already closed.", conn, *existing);
+ return send_server_ident()
+ .then([this] {
+ return true;
+ });
+ }
+
+ if (exproto->state == state_t::REPLACING) {
+ logger().warn("{} existing racing replace happened while replacing: {}",
+ conn, *existing);
+ return send_wait();
+ }
+
+ if (exproto->peer_global_seq > peer_global_seq) {
+ logger().warn("{} this is a stale connection, peer_global_seq={}"
+ "existing->peer_global_seq={} close this connection",
+ conn, peer_global_seq, exproto->peer_global_seq);
+ dispatch_reset();
+ abort_in_close();
+ }
+
+ if (existing->policy.lossy) {
+ // existing connection can be thrown out in favor of this one
+ logger().warn("{} existing={} is a lossy channel. Close existing in favor of"
+ " this connection", conn, *existing);
+ exproto->dispatch_reset();
+ exproto->close();
+ return send_server_ident()
+ .then([this] {
+ return true;
+ });
+ }
+
// TODO: lossless policy
ceph_assert(false);
}
seastar::future<bool> ProtocolV2::server_connect()
{
- // handle_client_ident() logic
- // <process ClientIdentFrame>
- // <case feature missing:>
- // <prepare and send IdentMissingFeaturesFrame>
- // <then: trigger SERVER_WAIT>
- // return seastar::make_ready_future<bool>(false);
- // <case existing:>
- // return handle_existing_connection(existing);
- // <case everything OK:>
- return send_server_ident()
+ return read_frame_payload()
+ .then([this] {
+ // handle_client_ident() logic
+ auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+ logger().debug("{} received client identification: addrs={} target={}"
+ " gid={} global_seq={} features_supported={}"
+ " features_required={} flags={} cookie={}",
+ conn, client_ident.addrs(), client_ident.target_addr(),
+ client_ident.gid(), client_ident.global_seq(),
+ client_ident.supported_features(),
+ client_ident.required_features(),
+ client_ident.flags(), client_ident.cookie());
+
+ if (client_ident.addrs().empty() ||
+ client_ident.addrs().front() == entity_addr_t()) {
+ logger().error("{} oops, client_ident.addrs() is empty", conn);
+ abort_in_fault();
+ }
+ if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
+ logger().error("{} peer is trying to reach {} which is not us ({})",
+ conn, client_ident.target_addr(), messenger.get_myaddrs());
+ abort_in_fault();
+ }
+ // TODO: change peer_addr to entity_addrvec_t
+ entity_addr_t paddr = client_ident.addrs().front();
+ conn.peer_addr = conn.target_addr;
+ conn.peer_addr.set_type(paddr.get_type());
+ conn.peer_addr.set_port(paddr.get_port());
+ conn.peer_addr.set_nonce(paddr.get_nonce());
+ logger().debug("{} got paddr={}, conn.peer_addr={}", conn, paddr, conn.peer_addr);
+ conn.target_addr = conn.peer_addr;
+
+ peer_name = entity_name_t(conn.get_peer_type(), client_ident.gid());
+ conn.peer_id = client_ident.gid();
+ client_cookie = client_ident.cookie();
+
+ uint64_t feat_missing =
+ (conn.policy.features_required | msgr2_required) &
+ ~(uint64_t)client_ident.supported_features();
+ if (feat_missing) {
+ logger().warn("{} peer missing required features {}", conn, feat_missing);
+ auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+ return write_frame(ident_missing_features)
.then([this] {
- // goto ready
- return true;
+ return false;
});
+ }
+ connection_features =
+ client_ident.supported_features() & conn.policy.features_supported;
+
+ peer_global_seq = client_ident.global_seq();
+
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+
+ SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr);
+
+ if (existing) {
+ if (existing->protocol->proto_type != 2) {
+ logger().warn("{} existing {} proto version is {}, close",
+ conn, *existing, existing->protocol->proto_type);
+ // should unregister the existing from msgr atomically
+ existing->close();
+ } else {
+ return handle_existing_connection(existing);
+ }
+ }
+
+ // if everything is OK reply with server identification
+ return send_server_ident()
+ .then([this] {
+ // goto ready
+ return true;
+ });
+ });
}
seastar::future<bool> ProtocolV2::read_reconnect()
{
- // return read_main_preamble()
- // .then([this] (Tag tag) {
- // expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
- return server_reconnect();
- // });
+ return read_main_preamble()
+ .then([this] (Tag tag) {
+ expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+ return server_reconnect();
+ });
}
seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
{
- // <prepare and send RetryFrame>
- // <then:>
- return read_reconnect();
+ auto retry = RetryFrame::Encode(connect_seq);
+ return write_frame(retry)
+ .then([this] {
+ return read_reconnect();
+ });
}
seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
{
- // <prepare and send RetryGlobalFrame>
- // <then:>
- return read_reconnect();
+ auto retry = RetryGlobalFrame::Encode(global_seq);
+ return write_frame(retry)
+ .then([this] {
+ return read_reconnect();
+ });
}
seastar::future<bool> ProtocolV2::send_reset(bool full)
{
- // <prepare and send ResetFrame>
- // <then:>
- // return read_main_preamble()
- // .then([this] (Tag tag) {
- // expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
- return server_connect();
- // });
+ auto reset = ResetFrame::Encode(full);
+ return write_frame(reset)
+ .then([this] {
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+ return server_connect();
+ });
}
seastar::future<bool> ProtocolV2::server_reconnect()
{
- // handle_reconnect() logic
- // <process ReconnectFrame>
- // <case no existing:>
- return send_reset(0);
- // <retry global cases:>
- // return send_retry_global();
- // <case wait:>
- // return send_wait();
- // <case retry:>
- // return send_retry();
- // <other reset cases:>
- // return send_reset();
- // TODO: lossless policy
- // return reuse_connection(existing, exproto);
+ return read_frame_payload()
+ .then([this] {
+ // handle_reconnect() logic
+ auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+
+ logger().debug("{} received reconnect: client_cookie={} server_cookie={}"
+ " gs={} cs={} ms={}",
+ conn, reconnect.client_cookie(), reconnect.server_cookie(),
+ reconnect.global_seq(), reconnect.connect_seq(),
+ reconnect.msg_seq());
+
+ // can peer_addrs be changed on-the-fly?
+ if (conn.peer_addr != reconnect.addrs().front()) {
+ logger().error("{} peer identifies as {}, while conn.peer_addr={}",
+ conn, reconnect.addrs().front(), conn.peer_addr);
+ ceph_assert(false);
+ }
+ // TODO: change peer_addr to entity_addrvec_t
+ ceph_assert(conn.peer_addr == conn.target_addr);
+ peer_global_seq = reconnect.global_seq();
+
+ SocketConnectionRef existing = messenger.lookup_conn(conn.peer_addr);
+
+ if (!existing) {
+ // there is no existing connection therefore cannot reconnect to previous
+ // session
+ logger().error("{} server_reconnect: no existing connection,"
+ " reseting client", conn);
+ return send_reset(true);
+ }
+
+ if (existing->protocol->proto_type != 2) {
+ logger().warn("{} server_reconnect: existing {} proto version is {},"
+ "close existing and resetting client.", conn, *existing);
+ existing->close();
+ return send_reset(true);
+ }
+
+ ProtocolV2 *exproto = dynamic_cast<ProtocolV2*>(existing->protocol.get());
+ ceph_assert(exproto);
+
+ if (exproto->state == state_t::REPLACING) {
+ logger().warn("{} server_reconnect: existing racing replace happened while "
+ " replacing, retry_global. existing={}", conn, *existing);
+ return send_retry_global(exproto->peer_global_seq);
+ }
+
+ if (exproto->client_cookie != reconnect.client_cookie()) {
+ logger().warn("{} server_reconnect: existing={} client cookie mismatch,"
+ " I must have reseted: cc={} rcc={}, reseting client.",
+ conn, *existing, exproto->client_cookie, reconnect.client_cookie());
+ return send_reset(conn.policy.resetcheck);
+ } else if (exproto->server_cookie == 0) {
+ // this happens when:
+ // - a connects to b
+ // - a sends client_ident
+ // - b gets client_ident, sends server_ident and sets cookie X
+ // - connection fault
+ // - b reconnects to a with cookie X, connect_seq=1
+ // - a has cookie==0
+ logger().warn("{} server_reconnect: I was a client and didn't received the"
+ " server_ident. Asking peer to resume session establishment",
+ conn);
+ return send_reset(false);
+ }
+
+ if (exproto->peer_global_seq > reconnect.global_seq()) {
+ logger().debug("{} server_reconnect: stale global_seq: sgs={} cgs={},"
+ " ask client to retry global",
+ conn, exproto->peer_global_seq, reconnect.global_seq());
+ return send_retry_global(exproto->peer_global_seq);
+ }
+
+ if (exproto->connect_seq > reconnect.connect_seq()) {
+ logger().debug("{} server_reconnect: stale connect_seq scs={} ccs={},"
+ " ask client to retry",
+ conn, exproto->connect_seq, reconnect.connect_seq());
+ return send_retry(exproto->connect_seq);
+ }
+
+ if (exproto->connect_seq == reconnect.connect_seq()) {
+ // reconnect race: both peers are sending reconnect messages
+ if (existing->peer_addr > messenger.get_myaddrs().msgr2_addr() &&
+ !existing->policy.server) {
+ // the existing connection wins
+ logger().warn("{} server_reconnect: reconnect race detected,"
+ " this connection loses to existing={},"
+ " ask client to wait", conn, *existing);
+ return send_wait();
+ } else {
+ // this connection wins
+ logger().warn("{} server_reconnect: reconnect race detected,"
+ " replacing existing={} socket by this connection's socket",
+ conn, *existing);
+ }
+ }
+
+ logger().warn("{} server_reconnect: reconnect to exsiting={}", conn, *existing);
+
+ // everything looks good
+ exproto->connect_seq = reconnect.connect_seq();
+ //exproto->message_seq = reconnect.msg_seq();
+
+ // TODO: lossless policy
+ // return reuse_connection(existing, exproto);
+ ceph_assert(false);
+ });
}
void ProtocolV2::execute_accepting()
conn.policy.standby, conn.policy.resetcheck);
return server_auth();
}).then([this] {
- // return read_main_preamble()
- //}).then([this] (Tag tag) {
- // switch (tag) {
- // case Tag::CLIENT_IDENT:
+ return read_main_preamble();
+ }).then([this] (Tag tag) {
+ switch (tag) {
+ case Tag::CLIENT_IDENT:
return server_connect();
- // case Tag::SESSION_RECONNECT:
- // return server_reconnect();
- // default: {
- // unexpected_tag(tag, conn, "post_server_auth");
- // }
- // }
+ case Tag::SESSION_RECONNECT:
+ // TODO: lossless policy
+ ceph_assert(false);
+ return server_reconnect();
+ default: {
+ unexpected_tag(tag, conn, "post_server_auth");
+ // won't be executed
+ return seastar::make_ready_future<bool>(false);
+ }
+ }
}).then([this] (bool proceed_or_wait) {
if (proceed_or_wait) {
messenger.register_conn(
seastar::future<> ProtocolV2::send_server_ident()
{
// send_server_ident() logic
- // <prepare and send ServerIdentFrame>
- return seastar::now();
+ // this is required for the case when this connection is being replaced
+ // TODO
+ // out_seq = discard_requeued_up_to(out_seq, 0);
+ conn.in_seq = 0;
+
+ if (!conn.policy.lossy) {
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
+
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags = flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+
+ // refered to async-conn v2: not assign gs to global_seq
+ uint64_t gs = messenger.get_global_seq();
+ auto server_ident = ServerIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ messenger.get_myname().num(),
+ gs,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags,
+ server_cookie);
+
+ logger().debug("{} sending server identification: addrs={} gid={}"
+ " global_seq={} features_supported={} features_required={}"
+ " flags={} cookie={}",
+ conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+ gs, conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, server_cookie);
+
+ conn.set_features(connection_features);
+
+ // notify
+ seastar::with_gate(pending_dispatch, [this] {
+ return dispatcher.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_accept caust exception: {}", conn, eptr);
+ });
+ });
+
+ return write_frame(server_ident);
}
// REPLACING state