reset_session();
return seastar::make_ready_future<stop_t>(stop_t::no);
case CEPH_MSGR_TAG_RETRY_GLOBAL:
- h.global_seq = messenger.get_global_seq(h.reply.global_seq);
- return seastar::make_ready_future<stop_t>(stop_t::no);
+ return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) {
+ h.global_seq = gs;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
case CEPH_MSGR_TAG_RETRY_SESSION:
ceph_assert(h.reply.connect_seq > h.connect_seq);
h.connect_seq = h.reply.connect_seq;
}
return seastar::now();
}).then([this] {
+ return messenger.get_global_seq();
+ }).then([this] (auto gs) {
+ h.global_seq = gs;
// read server's handshake header
return socket->read(server_header_size);
}).then([this] (bufferlist headerbl) {
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(messenger.get_myaddr(), bl, 0);
- h.global_seq = messenger.get_global_seq();
return socket->write_flush(std::move(bl));
}).then([=] {
return seastar::repeat([this] {
seastar::future<stop_t> ProtocolV1::send_connect_reply_ready(
msgr_tag_t tag, bufferlist&& authorizer_reply)
{
- h.global_seq = messenger.get_global_seq();
- h.reply.tag = tag;
- h.reply.features = conn.policy.features_supported;
- h.reply.global_seq = h.global_seq;
- h.reply.connect_seq = h.connect_seq;
- h.reply.flags = 0;
- if (conn.policy.lossy) {
- h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
- }
- h.reply.authorizer_len = authorizer_reply.length();
+ return messenger.get_global_seq(
+ ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) {
+ h.global_seq = gs;
+ h.reply.tag = tag;
+ h.reply.features = conn.policy.features_supported;
+ h.reply.global_seq = h.global_seq;
+ h.reply.connect_seq = h.connect_seq;
+ h.reply.flags = 0;
+ if (conn.policy.lossy) {
+ h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+ h.reply.authorizer_len = auth_len;
- session_security.reset(
- get_auth_session_handler(nullptr,
- auth_meta->auth_method,
- auth_meta->session_key,
- conn.features));
+ session_security.reset(
+ get_auth_session_handler(nullptr,
+ auth_meta->auth_method,
+ auth_meta->session_key,
+ conn.features));
- return socket->write(make_static_packet(h.reply))
- .then([this, reply=std::move(authorizer_reply)]() mutable {
+ return socket->write(make_static_packet(h.reply));
+ }).then([this, reply=std::move(authorizer_reply)]() mutable {
if (reply.length()) {
return socket->write(std::move(reply));
} else {
auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
logger().warn("{} GOT RetryGlobalFrame: gs={}",
conn, retry.global_seq());
- global_seq = messenger.get_global_seq(retry.global_seq());
- logger().warn("{} UPDATE: gs={}", conn, global_seq);
- return client_reconnect();
+ return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
+ global_seq = gs;
+ logger().warn("{} UPDATE: gs={}", conn, global_seq);
+ return client_reconnect();
+ });
});
case Tag::SESSION_RETRY:
return read_frame_payload().then([this] {
seastar::with_gate(pending_dispatch, [this] {
// we don't know my socket_port yet
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
- enable_recording();
- global_seq = messenger.get_global_seq();
- logger().debug("{} UPDATE: gs={}", conn, global_seq);
- return Socket::connect(conn.peer_addr)
- .then([this](SocketFRef sock) {
+ return messenger.get_global_seq().then([this] (auto gs) {
+ global_seq = gs;
+ enable_recording();
+ logger().debug("{} UPDATE: gs={}", conn, global_seq);
+ return Socket::connect(conn.peer_addr);
+ }).then([this](SocketFRef sock) {
logger().debug("{} socket connected", conn);
socket = std::move(sock);
if (state == state_t::CLOSING) {
{
// send_server_ident() logic
- // this is required for the case when this connection is being replaced
- // TODO
- // out_seq = discard_requeued_up_to(out_seq, 0);
- conn.in_seq = 0;
-
- if (!conn.policy.lossy) {
- server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
- }
+ // refered to async-conn v2: not assign gs to global_seq
+ return messenger.get_global_seq().then([this] (auto gs) {
+ logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
- uint64_t flags = 0;
- if (conn.policy.lossy) {
- flags = flags | CEPH_MSG_CONNECT_LOSSY;
- }
+ // this is required for the case when this connection is being replaced
+ // TODO
+ // out_seq = discard_requeued_up_to(out_seq, 0);
+ conn.in_seq = 0;
- // refered to async-conn v2: not assign gs to global_seq
- uint64_t gs = messenger.get_global_seq();
- auto server_ident = ServerIdentFrame::Encode(
- messenger.get_myaddrs(),
- messenger.get_myname().num(),
- gs,
- conn.policy.features_supported,
- conn.policy.features_required | msgr2_required,
- flags,
- server_cookie);
-
- logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
- " gs={}, features_supported={}, features_required={},"
- " flags={}, cookie={}",
- conn, messenger.get_myaddrs(), messenger.get_myname().num(),
- gs, conn.policy.features_supported,
- conn.policy.features_required | msgr2_required,
- flags, server_cookie);
+ if (!conn.policy.lossy) {
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
- conn.set_features(connection_features);
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags = flags | CEPH_MSG_CONNECT_LOSSY;
+ }
- // notify
- seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
- .handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
- ceph_abort("unecpected exception from ms_handle_accept()");
+ auto server_ident = ServerIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ messenger.get_myname().num(),
+ gs,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags,
+ server_cookie);
+
+ logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+ gs, conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, server_cookie);
+
+ conn.set_features(connection_features);
+
+ // notify
+ seastar::with_gate(pending_dispatch, [this] {
+ return dispatcher.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
+ ceph_abort("unecpected exception from ms_handle_accept()");
+ });
});
- });
- return write_frame(server_ident);
+ return write_frame(server_ident);
+ });
}
// REPLACING state