1, std::numeric_limits<uint64_t>::max());
}
+std::size_t get_msg_size(const FrameAssembler &rx_frame_asm)
+{
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
+ }
+ return sum;
+}
+
} // namespace anonymous
namespace fmt {
const entity_name_t& _peer_name)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!conn.socket);
+ ceph_assert(!frame_assembler.has_socket());
ceph_assert(!gate.is_closed());
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!conn.socket);
+ ceph_assert(!frame_assembler.has_socket());
// until we know better
conn.target_addr = _peer_addr;
- conn.socket = std::move(sock);
+ frame_assembler.set_socket(std::move(sock));
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
// TODO: Frame related implementations, probably to a separate class.
-void ProtocolV2::enable_recording()
-{
- rxbuf.clear();
- txbuf.clear();
- record_io = true;
-}
-
-seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
-{
- if (unlikely(record_io)) {
- return conn.socket->read_exactly(bytes
- ).then([this] (auto bl) {
- rxbuf.append(buffer::create(bl.share()));
- return bl;
- });
- } else {
- return conn.socket->read_exactly(bytes);
- };
-}
-
-seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
-{
- if (unlikely(record_io)) {
- return conn.socket->read(bytes
- ).then([this] (auto buf) {
- rxbuf.append(buf);
- return buf;
- });
- } else {
- return conn.socket->read(bytes);
- }
-}
-
-seastar::future<> ProtocolV2::write(bufferlist&& buf)
-{
- if (unlikely(record_io)) {
- txbuf.append(buf);
- }
- return conn.socket->write(std::move(buf));
-}
-
-seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
-{
- if (unlikely(record_io)) {
- txbuf.append(buf);
- }
- return conn.socket->write_flush(std::move(buf));
-}
-
-size_t ProtocolV2::get_current_msg_size() const
+seastar::future<FrameAssemblerV2::read_main_t>
+ProtocolV2::read_main_preamble()
{
- ceph_assert(rx_frame_asm.get_num_segments() > 0);
- size_t sum = 0;
- // we don't include SegmentIndex::Msg::HEADER.
- for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
- sum += rx_frame_asm.get_segment_logical_len(idx);
- }
- return sum;
-}
-
-seastar::future<Tag> ProtocolV2::read_main_preamble()
-{
- rx_preamble.clear();
- return read_exactly(rx_frame_asm.get_preamble_onwire_len())
- .then([this] (auto bl) {
- rx_segments_data.clear();
- try {
- rx_preamble.append(buffer::create(std::move(bl)));
- const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
- INTERCEPT_FRAME(tag, bp_type_t::READ);
- return tag;
- } catch (FrameError& e) {
- logger().warn("{} read_main_preamble: {}", conn, e.what());
- abort_in_fault();
- }
- });
+ return frame_assembler.read_main_preamble(
+#ifdef UNIT_TESTS_BUILT
+ ).then([this](auto ret) {
+ INTERCEPT_FRAME(ret.tag, bp_type_t::READ);
+ return ret;
+ });
+#else
+ );
+#endif
}
-seastar::future<> ProtocolV2::read_frame_payload()
+template <class F>
+ceph::bufferlist ProtocolV2::get_buffer(F &tx_frame)
{
- ceph_assert(rx_segments_data.empty());
-
- return seastar::do_until(
- [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
- [this] {
- // TODO: create aligned and contiguous buffer from socket
- const size_t seg_idx = rx_segments_data.size();
- if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
- alignment != segment_t::DEFAULT_ALIGNMENT) {
- logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
- conn, alignment, rx_segments_data.size());
- }
- uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
- // TODO: create aligned and contiguous buffer from socket
- return read_exactly(onwire_len).then([this] (auto tmp_bl) {
- logger().trace("{} RECV({}) frame segment[{}]",
- conn, tmp_bl.size(), rx_segments_data.size());
- bufferlist segment;
- segment.append(buffer::create(std::move(tmp_bl)));
- rx_segments_data.emplace_back(std::move(segment));
- });
- }
- ).then([this] {
- return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
- }).then([this] (auto bl) {
- logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
- bool ok = false;
- try {
- bufferlist rx_epilogue;
- rx_epilogue.append(buffer::create(std::move(bl)));
- ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
- } catch (FrameError& e) {
- logger().error("read_frame_payload: {} {}", conn, e.what());
- abort_in_fault();
- } catch (ceph::crypto::onwire::MsgAuthError&) {
- logger().error("read_frame_payload: {} bad auth tag", conn);
- abort_in_fault();
- }
- // we do have a mechanism that allows transmitter to start sending message
- // and abort after putting entire data field on wire. This will be used by
- // the kernel client to avoid unnecessary buffering.
- if (!ok) {
- // TODO
- ceph_assert(false);
- }
- });
+ INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
+ return frame_assembler.get_buffer(tx_frame);
}
template <class F>
-seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
+seastar::future<> ProtocolV2::write_flush_frame(F &tx_frame)
{
- auto bl = frame.get_buffer(tx_frame_asm);
- const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
- logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
- conn, bl.length(), (int)main_preamble->tag,
- (int)main_preamble->num_segments, main_preamble->crc);
- INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
- if (flush) {
- return write_flush(std::move(bl));
- } else {
- return write(std::move(bl));
- }
+ INTERCEPT_FRAME(F::tag, bp_type_t::WRITE);
+ return frame_assembler.write_flush_frame(tx_frame);
}
void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
- return write_flush(std::move(bl)).then([this] {
+ return frame_assembler.write_flush(std::move(bl)).then([this] {
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
- return read_exactly(banner_len); // or read exactly?
+ return frame_assembler.read_exactly(banner_len); // or read exactly?
}).then([this] (auto bl) {
// 3. process peer banner and read banner_payload
unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
}
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
- return read(payload_len);
+ return frame_assembler.read(payload_len);
}).then([this, is_connect] (bufferlist bl) {
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
}
peer_supported_features = _peer_supported_features;
bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- tx_frame_asm.set_is_rev1(is_rev1);
- rx_frame_asm.set_is_rev1(is_rev1);
+ frame_assembler.set_is_rev1(is_rev1);
auto hello = HelloFrame::Encode(messenger.get_mytype(),
conn.target_addr);
logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
conn, ceph_entity_type_name(messenger.get_mytype()),
conn.target_addr);
- return write_frame(hello);
+ return write_flush_frame(hello);
}).then([this] {
//5. read peer HelloFrame
return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::HELLO, tag, conn, __func__);
- return read_frame_payload();
- }).then([this] {
+ }).then([this](auto ret) {
+ expect_tag(Tag::HELLO, ret.tag, conn, __func__);
+ return frame_assembler.read_frame_payload();
+ }).then([this](auto payload) {
// 6. process peer HelloFrame
- auto hello = HelloFrame::Decode(rx_segments_data.back());
+ auto hello = HelloFrame::Decode(payload->back());
logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
conn, ceph_entity_type_name(hello.entity_type()),
hello.peer_addr());
seastar::future<> ProtocolV2::handle_auth_reply()
{
- return read_main_preamble()
- .then([this] (Tag tag) {
- switch (tag) {
+ return read_main_preamble(
+ ).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::AUTH_BAD_METHOD:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_auth_bad_method() logic
- auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
+ auto bad_method = AuthBadMethodFrame::Decode(payload->back());
logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
"allowed_methods={}, allowed_modes={}",
conn, bad_method.method(), cpp_strerror(bad_method.result()),
return client_auth(bad_method.allowed_methods());
});
case Tag::AUTH_REPLY_MORE:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_auth_reply_more() logic
- auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
+ auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
conn, auth_more.auth_payload().length());
ceph_assert(messenger.get_auth_client());
auto more_reply = AuthRequestMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
conn, reply.length());
- return write_frame(more_reply);
+ return write_flush_frame(more_reply);
}).then([this] {
return handle_auth_reply();
});
case Tag::AUTH_DONE:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_auth_done() logic
- auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+ auto auth_done = AuthDoneFrame::Decode(payload->back());
logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
conn, auth_done.global_id(),
ceph_con_mode_name(auth_done.con_mode()),
abort_in_fault();
}
auth_meta->con_mode = auth_done.con_mode();
- bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
- nullptr, *auth_meta, is_rev1, false);
+ frame_assembler.create_session_stream_handlers(*auth_meta, false);
return finish_auth();
});
default: {
- unexpected_tag(tag, conn, __func__);
+ unexpected_tag(ret.tag, conn, __func__);
return seastar::now();
}
}
logger().debug("{} WRITE AuthRequestFrame: method={},"
" preferred_modes={}, payload_len={}",
conn, auth_method, preferred_modes, bl.length());
- return write_frame(frame).then([this] {
+ return write_flush_frame(frame).then([this] {
return handle_auth_reply();
});
} catch (const crimson::auth::error& e) {
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::process_wait()
{
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_wait() logic
logger().debug("{} GOT WaitFrame", conn);
- WaitFrame::Decode(rx_segments_data.back());
+ WaitFrame::Decode(payload->back());
return next_step_t::wait;
});
}
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, client_cookie);
- return write_frame(client_ident).then([this] {
+ return write_flush_frame(client_ident).then([this] {
return read_main_preamble();
- }).then([this] (Tag tag) {
- switch (tag) {
+ }).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::IDENT_MISSING_FEATURES:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_ident_missing_features() logic
- auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+ auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
" (client does not support all server features)",
conn, ident_missing.features());
case Tag::WAIT:
return process_wait();
case Tag::SERVER_IDENT:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_server_ident() logic
requeue_out_sent();
- auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+ auto server_ident = ServerIdentFrame::Decode(payload->back());
logger().debug("{} GOT ServerIdentFrame:"
" addrs={}, gid={}, gs={},"
" features_supported={}, features_required={},"
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
- unexpected_tag(tag, conn, "post_client_connect");
+ unexpected_tag(ret.tag, conn, "post_client_connect");
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
global_seq, connect_seq, get_in_seq());
- return write_frame(reconnect).then([this] {
+ return write_flush_frame(reconnect).then([this] {
return read_main_preamble();
- }).then([this] (Tag tag) {
- switch (tag) {
+ }).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::SESSION_RETRY_GLOBAL:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_session_retry_global() logic
- auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+ auto retry = RetryGlobalFrame::Decode(payload->back());
logger().warn("{} GOT RetryGlobalFrame: gs={}",
conn, retry.global_seq());
global_seq = messenger.get_global_seq(retry.global_seq());
return client_reconnect();
});
case Tag::SESSION_RETRY:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_session_retry() logic
- auto retry = RetryFrame::Decode(rx_segments_data.back());
+ auto retry = RetryFrame::Decode(payload->back());
logger().warn("{} GOT RetryFrame: cs={}",
conn, retry.connect_seq());
connect_seq = retry.connect_seq() + 1;
return client_reconnect();
});
case Tag::SESSION_RESET:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_session_reset() logic
- auto reset = ResetFrame::Decode(rx_segments_data.back());
+ auto reset = ResetFrame::Decode(payload->back());
logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
reset_session(reset.full());
return client_connect();
case Tag::WAIT:
return process_wait();
case Tag::SESSION_RECONNECT_OK:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_reconnect_ok() logic
- auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+ auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
conn, reconnect_ok.msg_seq());
requeue_out_sent_up_to(reconnect_ok.msg_seq());
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
- unexpected_tag(tag, conn, "post_client_reconnect");
+ unexpected_tag(ret.tag, conn, "post_client_reconnect");
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
void ProtocolV2::execute_connecting()
{
trigger_state(state_t::CONNECTING, out_state_t::delay, false);
- if (conn.socket) {
- conn.socket->shutdown();
- }
+ frame_assembler.shutdown_socket();
gated_execute("execute_connecting", [this] {
global_seq = messenger.get_global_seq();
assert(client_cookie != 0);
conn, get_state_name(state));
abort_protocol();
}
- if (conn.socket) {
- gate.dispatch_in_background("close_sockect_connecting", *this,
- [sock = std::move(conn.socket)] () mutable {
- return sock->close().then([sock = std::move(sock)] {});
- });
- }
+ gate.dispatch_in_background(
+ "reset_close_socket_connecting",
+ *this,
+ [this] { return frame_assembler.reset_and_close_socket(); });
INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
return Socket::connect(conn.peer_addr);
}).then([this](SocketRef sock) {
abort_protocol();
});
}
- conn.socket = std::move(sock);
+ frame_assembler.set_socket(std::move(sock));
return seastar::now();
}).then([this] {
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- session_stream_handlers = { nullptr, nullptr };
- enable_recording();
+ frame_assembler.reset_handlers();
+ frame_assembler.start_recording();
return banner_exchange(true);
}).then([this] (auto&& ret) {
auto [_peer_type, _my_addr_from_peer] = std::move(ret);
conn, get_state_name(state));
abort_protocol();
}
- conn.socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+ frame_assembler.learn_socket_ephemeral_port_as_connector(
+ _my_addr_from_peer.get_port());
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
conn, _my_addr_from_peer);
"allowed_methods={}, allowed_modes={})",
conn, auth_meta->auth_method, cpp_strerror(r),
allowed_methods, allowed_modes);
- return write_frame(bad_method).then([this] {
+ return write_flush_frame(bad_method).then([this] {
return server_auth();
});
}
logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
conn, conn.peer_global_id,
ceph_con_mode_name(auth_meta->con_mode), reply.length());
- return write_frame(auth_done).then([this] {
+ return write_flush_frame(auth_done).then([this] {
ceph_assert(auth_meta);
- bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
- nullptr, *auth_meta, is_rev1, true);
+ frame_assembler.create_session_stream_handlers(*auth_meta, true);
return finish_auth();
});
}
auto more = AuthReplyMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
conn, reply.length());
- return write_frame(more).then([this] {
+ return write_flush_frame(more).then([this] {
return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
- return read_frame_payload();
- }).then([this] {
- auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__);
+ return frame_assembler.read_frame_payload();
+ }).then([this](auto payload) {
+ auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
conn, auth_more.auth_payload().length());
return _handle_auth_request(auth_more.auth_payload(), true);
seastar::future<> ProtocolV2::server_auth()
{
- return read_main_preamble()
- .then([this] (Tag tag) {
- expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
- return read_frame_payload();
- }).then([this] {
+ return read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__);
+ return frame_assembler.read_frame_payload();
+ }).then([this](auto payload) {
// handle_auth_request() logic
- auto request = AuthRequestFrame::Decode(rx_segments_data.back());
+ auto request = AuthRequestFrame::Decode(payload->back());
logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
" payload_len={}",
conn, request.method(), request.preferred_modes(),
{
auto wait = WaitFrame::Encode();
logger().debug("{} WRITE WaitFrame", conn);
- return write_frame(wait).then([] {
+ return write_flush_frame(wait).then([] {
return next_step_t::wait;
});
}
{
existing_proto->trigger_replacing(reconnect,
do_reset,
- std::move(conn.socket),
+ frame_assembler.to_replace(),
std::move(auth_meta),
- std::move(session_stream_handlers),
peer_global_seq,
client_cookie,
conn.get_peer_name(),
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::server_connect()
{
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_client_ident() logic
- auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+ auto client_ident = ClientIdentFrame::Decode(payload->back());
logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
" gid={}, gs={}, features_supported={},"
" features_required={}, flags={}, cookie={}",
auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
conn, feat_missing);
- return write_frame(ident_missing_features).then([] {
+ return write_flush_frame(ident_missing_features).then([] {
return next_step_t::wait;
});
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::read_reconnect()
{
- return read_main_preamble()
- .then([this] (Tag tag) {
- expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+ return read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect");
return server_reconnect();
});
}
{
auto retry = RetryFrame::Encode(connect_seq);
logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
- return write_frame(retry).then([this] {
+ return write_flush_frame(retry).then([this] {
return read_reconnect();
});
}
{
auto retry = RetryGlobalFrame::Encode(global_seq);
logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
- return write_frame(retry).then([this] {
+ return write_flush_frame(retry).then([this] {
return read_reconnect();
});
}
{
auto reset = ResetFrame::Encode(full);
logger().warn("{} WRITE ResetFrame: full={}", conn, full);
- return write_frame(reset).then([this] {
+ return write_flush_frame(reset).then([this] {
return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+ }).then([this](auto ret) {
+ expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
return server_connect();
});
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::server_reconnect()
{
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_reconnect() logic
- auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+ auto reconnect = ReconnectFrame::Decode(payload->back());
logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
" server_cookie={}, gs={}, cs={}, msg_seq={}",
return seastar::futurize_invoke([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- session_stream_handlers = { nullptr, nullptr };
- session_comp_handlers = { nullptr, nullptr };
- enable_recording();
+ frame_assembler.reset_handlers();
+ frame_assembler.start_recording();
return banner_exchange(false);
}).then([this] (auto&& ret) {
auto [_peer_type, _my_addr_from_peer] = std::move(ret);
return server_auth();
}).then([this] {
return read_main_preamble();
- }).then([this] (Tag tag) {
- switch (tag) {
+ }).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::CLIENT_IDENT:
return server_connect();
case Tag::SESSION_RECONNECT:
return server_reconnect();
default: {
- unexpected_tag(tag, conn, "post_server_auth");
+ unexpected_tag(ret.tag, conn, "post_server_auth");
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
{
ceph_assert(auth_meta);
+ auto records = frame_assembler.stop_recording();
const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
- auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
+ auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
auto sig_frame = AuthSignatureFrame::Encode(sig);
- ceph_assert(record_io);
- record_io = false;
- rxbuf.clear();
logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
- return write_frame(sig_frame).then([this] {
+ return write_flush_frame(sig_frame).then([this] {
return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
- return read_frame_payload();
- }).then([this] {
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
+ return frame_assembler.read_frame_payload();
+ }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
// handle_auth_signature() logic
- auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+ auto sig_frame = AuthSignatureFrame::Decode(payload->back());
logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
const auto actual_tx_sig = auth_meta->session_key.empty() ?
conn, actual_tx_sig, sig_frame.signature());
abort_in_fault();
}
- txbuf.clear();
});
}
conn.policy.features_required | msgr2_required,
flags, server_cookie);
- return write_frame(server_ident);
+ return write_flush_frame(server_ident);
}
// REPLACING state
void ProtocolV2::trigger_replacing(bool reconnect,
bool do_reset,
- SocketRef&& new_socket,
+ FrameAssemblerV2::mover_t &&mover,
AuthConnectionMetaRef&& new_auth_meta,
- ceph::crypto::onwire::rxtx_t new_rxtx,
uint64_t new_peer_global_seq,
uint64_t new_client_cookie,
entity_name_t new_peer_name,
uint64_t new_msg_seq)
{
trigger_state(state_t::REPLACING, out_state_t::delay, false);
- if (conn.socket) {
- conn.socket->shutdown();
- }
+ frame_assembler.shutdown_socket();
dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
gate.dispatch_in_background("trigger_replacing", *this,
[this,
reconnect,
do_reset,
- new_socket = std::move(new_socket),
+ mover = std::move(mover),
new_auth_meta = std::move(new_auth_meta),
- new_rxtx = std::move(new_rxtx),
new_client_cookie, new_peer_name,
new_conn_features, new_peer_supported_features,
new_peer_global_seq,
return execution_done.get_future();
}).then([this,
reconnect,
- new_socket = std::move(new_socket),
+ mover = std::move(mover),
new_auth_meta = std::move(new_auth_meta),
- new_rxtx = std::move(new_rxtx),
new_client_cookie, new_peer_name,
new_conn_features, new_peer_supported_features,
new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
if (unlikely(state != state_t::REPLACING)) {
- return new_socket->close().then([sock = std::move(new_socket)] {
+ return mover.socket->close(
+ ).then([sock = std::move(mover.socket)] {
abort_protocol();
});
}
- if (conn.socket) {
- gate.dispatch_in_background("close_socket_replacing", *this,
- [sock = std::move(conn.socket)] () mutable {
- return sock->close().then([sock = std::move(sock)] {});
- });
- }
- conn.socket = std::move(new_socket);
+ gate.dispatch_in_background(
+ "reset_close_socket_replacing",
+ *this,
+ [this] { return frame_assembler.reset_and_close_socket(); });
auth_meta = std::move(new_auth_meta);
- session_stream_handlers = std::move(new_rxtx);
- record_io = false;
peer_global_seq = new_peer_global_seq;
+ frame_assembler.replace_by(std::move(mover));
if (reconnect) {
connect_seq = new_connect_seq;
requeue_out_sent_up_to(new_msg_seq);
auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
- return write_frame(reconnect_ok);
+ return write_flush_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
assert(conn.get_peer_type() == new_peer_name.type());
conn.set_features(new_conn_features);
peer_supported_features = new_peer_supported_features;
bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- tx_frame_asm.set_is_rev1(is_rev1);
- rx_frame_asm.set_is_rev1(is_rev1);
+ frame_assembler.set_is_rev1(is_rev1);
return send_server_ident();
}
}).then([this, reconnect] {
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(keepalive_frame.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
+ bl.append(get_buffer(keepalive_frame));
}
if (unlikely(maybe_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
- bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
+ bl.append(get_buffer(keepalive_ack_frame));
}
if (require_ack && num_msgs == 0u) {
auto ack_frame = AckFrame::Encode(get_in_seq());
- bl.append(ack_frame.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
+ bl.append(get_buffer(ack_frame));
}
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
msg->get_payload(), msg->get_middle(), msg->get_data());
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(message.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
+ bl.append(get_buffer(message));
});
return bl;
}
-seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
+seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size)
{
- return read_frame_payload()
- .then([this, throttle_stamp] {
+ return frame_assembler.read_frame_payload(
+ ).then([this, throttle_stamp, msg_size](auto payload) {
utime_t recv_stamp{seastar::lowres_system_clock::now()};
// we need to get the size before std::moving segments data
- const size_t cur_msg_size = get_current_msg_size();
- auto msg_frame = MessageFrame::Decode(rx_segments_data);
+ auto msg_frame = MessageFrame::Decode(*payload);
// XXX: paranoid copy just to avoid oops
ceph_msg_header2 current_header = msg_frame.header();
// store reservation size in message, so we don't get confused
// by messages entering the dispatch queue through other paths.
- message->set_dispatch_throttle_size(cur_msg_size);
+ message->set_dispatch_throttle_size(msg_size);
message->set_throttle_stamp(throttle_stamp);
message->set_recv_stamp(recv_stamp);
gated_execute("execute_ready", [this] {
protocol_timer.cancel();
return seastar::keep_doing([this] {
- return read_main_preamble()
- .then([this] (Tag tag) {
- switch (tag) {
+ return read_main_preamble(
+ ).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::MESSAGE: {
+ size_t msg_size = get_msg_size(*ret.rx_frame_asm);
return seastar::futurize_invoke([this] {
// throttle_message() logic
if (!conn.policy.throttler_messages) {
// TODO: message throttler
ceph_assert(false);
return seastar::now();
- }).then([this] {
+ }).then([this, msg_size] {
// throttle_bytes() logic
if (!conn.policy.throttler_bytes) {
return seastar::now();
}
- size_t cur_msg_size = get_current_msg_size();
- if (!cur_msg_size) {
+ if (!msg_size) {
return seastar::now();
}
logger().trace("{} wants {} bytes from policy throttler {}/{}",
- conn, cur_msg_size,
+ conn, msg_size,
conn.policy.throttler_bytes->get_current(),
conn.policy.throttler_bytes->get_max());
- return conn.policy.throttler_bytes->get(cur_msg_size);
- }).then([this] {
+ return conn.policy.throttler_bytes->get(msg_size);
+ }).then([this, msg_size] {
// TODO: throttle_dispatch_queue() logic
utime_t throttle_stamp{seastar::lowres_system_clock::now()};
- return read_message(throttle_stamp);
+ return read_message(throttle_stamp, msg_size);
});
}
case Tag::ACK:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_message_ack() logic
- auto ack = AckFrame::Decode(rx_segments_data.back());
+ auto ack = AckFrame::Decode(payload->back());
logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
ack_out_sent(ack.seq());
});
case Tag::KEEPALIVE2:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_keepalive2() logic
- auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+ auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
logger().debug("{} GOT KeepAliveFrame: timestamp={}",
conn, keepalive_frame.timestamp());
notify_keepalive_ack(keepalive_frame.timestamp());
set_last_keepalive(seastar::lowres_system_clock::now());
});
case Tag::KEEPALIVE2_ACK:
- return read_frame_payload().then([this] {
+ return frame_assembler.read_frame_payload(
+ ).then([this](auto payload) {
// handle_keepalive2_ack() logic
- auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
+ auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
auto _last_keepalive_ack =
seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
set_last_keepalive_ack(_last_keepalive_ack);
conn, _last_keepalive_ack);
});
default: {
- unexpected_tag(tag, conn, "execute_ready");
+ unexpected_tag(ret.tag, conn, "execute_ready");
return seastar::now();
}
}
void ProtocolV2::execute_standby()
{
trigger_state(state_t::STANDBY, out_state_t::delay, false);
- if (conn.socket) {
- conn.socket->shutdown();
- }
+ frame_assembler.shutdown_socket();
}
void ProtocolV2::notify_out()
void ProtocolV2::execute_wait(bool max_backoff)
{
trigger_state(state_t::WAIT, out_state_t::delay, false);
- if (conn.socket) {
- conn.socket->shutdown();
- }
+ frame_assembler.shutdown_socket();
gated_execute("execute_wait", [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
{
trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
gated_execute("execute_server_wait", [this] {
- return read_exactly(1).then([this] (auto bl) {
+ return frame_assembler.read_exactly(1).then([this] (auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();
}).handle_exception([this] (std::exception_ptr eptr) {
if (f_accept_new) {
(*f_accept_new)();
}
- if (conn.socket) {
- conn.socket->shutdown();
- }
+ frame_assembler.shutdown_socket();
assert(!gate.is_closed());
auto gate_closed = gate.close();
auto out_closed = close_out();
closed_clean_fut = seastar::when_all(
std::move(gate_closed), std::move(out_closed)
).discard_result().then([this] {
- if (conn.socket) {
- return conn.socket->close();
- } else {
- return seastar::now();
- }
+ return frame_assembler.reset_and_close_socket(false);
}).then([this] {
logger().debug("{} closed!", conn);
messenger.closed_conn(