From: Yingxin Cheng Date: Thu, 17 Nov 2022 09:06:17 +0000 (+0800) Subject: crimson/net: introduce and integrate FrameAssemblerV2 X-Git-Tag: v18.1.0~375^2~28 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b958da388f59226a16bdbaff18175f6ffd9afe2b;p=ceph-ci.git crimson/net: introduce and integrate FrameAssemblerV2 FrameAssemblerV2 encapsulates the low-level frame processing for protocol v2, and manages the socket instance. FrameAssemblerV2 is supposed to be running on the socket core for performance reasons, which will be changable. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 44c2ee93065..cd5d1fb0cd8 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -175,6 +175,7 @@ set(crimson_net_srcs ${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc net/Errors.cc + net/FrameAssemblerV2.cc net/Messenger.cc net/SocketConnection.cc net/SocketMessenger.cc diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc new file mode 100644 index 00000000000..cf419897460 --- /dev/null +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -0,0 +1,264 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include "FrameAssemblerV2.h" + +#include "Errors.h" +#include "SocketConnection.h" + +using ceph::msgr::v2::FrameAssembler; +using ceph::msgr::v2::FrameError; +using ceph::msgr::v2::preamble_block_t; +using ceph::msgr::v2::segment_t; +using ceph::msgr::v2::Tag; + +namespace { + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +} // namespace anonymous + +namespace crimson::net { + +FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn) + : conn{_conn} +{} + +void FrameAssemblerV2::set_is_rev1(bool _is_rev1) +{ + is_rev1 = _is_rev1; + tx_frame_asm.set_is_rev1(_is_rev1); + rx_frame_asm.set_is_rev1(_is_rev1); +} + +void FrameAssemblerV2::create_session_stream_handlers( + const AuthConnectionMeta &auth_meta, + bool crossed) +{ + session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( + nullptr, auth_meta, is_rev1, crossed); +} + +void FrameAssemblerV2::reset_handlers() +{ + session_stream_handlers = { nullptr, nullptr }; + session_comp_handlers = { nullptr, nullptr }; +} + +FrameAssemblerV2::mover_t +FrameAssemblerV2::to_replace() +{ + assert(has_socket()); + socket = nullptr; + return mover_t{ + std::move(conn.socket), + std::move(session_stream_handlers), + std::move(session_comp_handlers)}; +} + +void FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover) +{ + set_socket(std::move(mover.socket)); + record_io = false; + rxbuf.clear(); + txbuf.clear(); + session_stream_handlers = std::move(mover.session_stream_handlers); + session_comp_handlers = std::move(mover.session_comp_handlers); +} + +void FrameAssemblerV2::start_recording() +{ + record_io = true; + rxbuf.clear(); + txbuf.clear(); +} + +FrameAssemblerV2::record_bufs_t +FrameAssemblerV2::stop_recording() +{ + ceph_assert_always(record_io == true); + record_io = false; + return record_bufs_t{std::move(rxbuf), std::move(txbuf)}; +} + +bool FrameAssemblerV2::has_socket() const +{ + assert((socket && conn.socket) || (!socket && !conn.socket)); + return socket != nullptr; +} + +void FrameAssemblerV2::set_socket(SocketRef &&_socket) +{ + assert(!has_socket()); + ceph_assert_always(!conn.socket); + socket = _socket.get(); + conn.socket = std::move(_socket); + assert(has_socket()); +} + +void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port) +{ + assert(has_socket()); + socket->learn_ephemeral_port_as_connector(port); +} + +void FrameAssemblerV2::shutdown_socket() +{ + if (has_socket()) { + socket->shutdown(); + } +} + +seastar::future<> FrameAssemblerV2::reset_and_close_socket(bool do_reset) +{ + if (!has_socket()) { + return seastar::now(); + } + if (do_reset) { + socket = nullptr; + return conn.socket->close( + ).then([sock = std::move(conn.socket)] {}); + } else { + return socket->close(); + } +} + +seastar::future +FrameAssemblerV2::read_exactly(std::size_t bytes) +{ + assert(has_socket()); + if (unlikely(record_io)) { + return socket->read_exactly(bytes + ).then([this](auto bl) { + rxbuf.append(buffer::create(bl.share())); + return bl; + }); + } else { + return socket->read_exactly(bytes); + }; +} + +seastar::future +FrameAssemblerV2::read(std::size_t bytes) +{ + assert(has_socket()); + if (unlikely(record_io)) { + return socket->read(bytes + ).then([this](auto buf) { + rxbuf.append(buf); + return buf; + }); + } else { + return socket->read(bytes); + } +} + +seastar::future<> +FrameAssemblerV2::write(ceph::bufferlist &&buf) +{ + assert(has_socket()); + if (unlikely(record_io)) { + txbuf.append(buf); + } + return socket->write(std::move(buf)); +} + +seastar::future<> +FrameAssemblerV2::flush() +{ + assert(has_socket()); + return socket->flush(); +} + +seastar::future<> +FrameAssemblerV2::write_flush(ceph::bufferlist &&buf) +{ + assert(has_socket()); + if (unlikely(record_io)) { + txbuf.append(buf); + } + return socket->write_flush(std::move(buf)); +} + +seastar::future +FrameAssemblerV2::read_main_preamble() +{ + rx_preamble.clear(); + return read_exactly(rx_frame_asm.get_preamble_onwire_len() + ).then([this](auto bl) { + try { + rx_preamble.append(buffer::create(std::move(bl))); + const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); + return read_main_t{tag, &rx_frame_asm}; + } catch (FrameError& e) { + logger().warn("{} read_main_preamble: {}", conn, e.what()); + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + } + }); +} + +seastar::future +FrameAssemblerV2::read_frame_payload() +{ + rx_segments_data.clear(); + return seastar::do_until( + [this] { + return rx_frame_asm.get_num_segments() == rx_segments_data.size(); + }, + [this] { + // TODO: create aligned and contiguous buffer from socket + const size_t seg_idx = rx_segments_data.size(); + if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx); + alignment != segment_t::DEFAULT_ALIGNMENT) { + logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}", + conn, alignment, rx_segments_data.size()); + } + uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); + // TODO: create aligned and contiguous buffer from socket + return read_exactly(onwire_len + ).then([this](auto tmp_bl) { + logger().trace("{} RECV({}) frame segment[{}]", + conn, tmp_bl.size(), rx_segments_data.size()); + bufferlist segment; + segment.append(buffer::create(std::move(tmp_bl))); + rx_segments_data.emplace_back(std::move(segment)); + }); + } + ).then([this] { + return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); + }).then([this](auto bl) { + logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); + bool ok = false; + try { + bufferlist rx_epilogue; + rx_epilogue.append(buffer::create(std::move(bl))); + ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue); + } catch (FrameError& e) { + logger().error("read_frame_payload: {} {}", conn, e.what()); + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + } catch (ceph::crypto::onwire::MsgAuthError&) { + logger().error("read_frame_payload: {} bad auth tag", conn); + throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + } + // we do have a mechanism that allows transmitter to start sending message + // and abort after putting entire data field on wire. This will be used by + // the kernel client to avoid unnecessary buffering. + if (!ok) { + ceph_abort("TODO"); + } + return &rx_segments_data; + }); +} + +void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl) +{ + const auto main_preamble = + reinterpret_cast(bl.front().c_str()); + logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", + conn, bl.length(), (int)main_preamble->tag, + (int)main_preamble->num_segments, main_preamble->crc); +} + +} // namespace crimson::net diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h new file mode 100644 index 00000000000..f72eeeeae45 --- /dev/null +++ b/src/crimson/net/FrameAssemblerV2.h @@ -0,0 +1,158 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "msg/async/frames_v2.h" +#include "msg/async/crypto_onwire.h" +#include "msg/async/compression_onwire.h" + +#include "crimson/net/Socket.h" + +namespace crimson::net { + +class SocketConnection; + +class FrameAssemblerV2 { +public: + FrameAssemblerV2(SocketConnection &conn); + + ~FrameAssemblerV2() = default; + + FrameAssemblerV2(const FrameAssemblerV2 &) = delete; + + FrameAssemblerV2(FrameAssemblerV2 &&) = delete; + + void set_is_rev1(bool is_rev1); + + void create_session_stream_handlers( + const AuthConnectionMeta &auth_meta, + bool crossed); + + void reset_handlers(); + + /* + * replacing + */ + + struct mover_t { + SocketRef socket; + ceph::crypto::onwire::rxtx_t session_stream_handlers; + ceph::compression::onwire::rxtx_t session_comp_handlers; + }; + + mover_t to_replace(); + + void replace_by(mover_t &&); + + /* + * auth signature interfaces + */ + + void start_recording(); + + struct record_bufs_t { + ceph::bufferlist rxbuf; + ceph::bufferlist txbuf; + }; + record_bufs_t stop_recording(); + + /* + * socket maintainence interfaces + */ + + bool has_socket() const; + + void set_socket(SocketRef &&); + + void learn_socket_ephemeral_port_as_connector(uint16_t port); + + void shutdown_socket(); + + seastar::future<> reset_and_close_socket(bool do_reset=true); + + /* + * socket read and write interfaces + */ + + seastar::future read_exactly(std::size_t bytes); + + seastar::future read(std::size_t bytes); + + seastar::future<> write(ceph::bufferlist &&); + + seastar::future<> flush(); + + seastar::future<> write_flush(ceph::bufferlist &&); + + /* + * frame read and write interfaces + */ + + /// may throw negotiation_failure as fault + struct read_main_t { + ceph::msgr::v2::Tag tag; + const ceph::msgr::v2::FrameAssembler *rx_frame_asm; + }; + seastar::future read_main_preamble(); + + /// may throw negotiation_failure as fault + using read_payload_t = ceph::msgr::v2::segment_bls_t; + // FIXME: read_payload_t cannot be no-throw move constructible + seastar::future read_frame_payload(); + + template + ceph::bufferlist get_buffer(F &tx_frame) { + auto bl = tx_frame.get_buffer(tx_frame_asm); + log_main_preamble(bl); + return bl; + } + + template + seastar::future<> write_flush_frame(F &tx_frame) { + auto bl = get_buffer(tx_frame); + return write_flush(std::move(bl)); + } + +private: + void log_main_preamble(const ceph::bufferlist &bl); + + SocketConnection &conn; + + Socket *socket = nullptr; + + /* + * auth signature + */ + + bool record_io = false; + + ceph::bufferlist rxbuf; + + ceph::bufferlist txbuf; + + /* + * frame data and handlers + */ + + ceph::crypto::onwire::rxtx_t session_stream_handlers = { nullptr, nullptr }; + + // TODO + ceph::compression::onwire::rxtx_t session_comp_handlers = { nullptr, nullptr }; + + bool is_rev1 = false; + + ceph::msgr::v2::FrameAssembler tx_frame_asm{ + &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data, + &session_comp_handlers}; + + ceph::msgr::v2::FrameAssembler rx_frame_asm{ + &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data, + &session_comp_handlers}; + + ceph::bufferlist rx_preamble; + + read_payload_t rx_segments_data; +}; + +} // namespace crimson::net diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index e38590a1aec..f8bb3b7e932 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -22,7 +22,8 @@ namespace crimson::net { Protocol::Protocol(ChainedDispatchers& dispatchers, SocketConnection& conn) : dispatchers(dispatchers), - conn(conn) + conn(conn), + frame_assembler(conn) {} Protocol::~Protocol() @@ -156,7 +157,7 @@ void Protocol::ack_out_sent(seq_num_t seq) seastar::future Protocol::try_exit_out_dispatch() { assert(!is_out_queued()); - return conn.socket->flush().then([this] { + return frame_assembler.flush().then([this] { if (!is_out_queued()) { // still nothing pending to send after flush, // the dispatching can ONLY stop now @@ -190,7 +191,7 @@ seastar::future<> Protocol::do_out_dispatch() auto to_ack = ack_left; assert(to_ack == 0 || in_seq > 0); // sweep all pending out with the concrete Protocol - return conn.socket->write( + return frame_assembler.write( sweep_out_pending_msgs_to_sent( num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0) ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { @@ -241,7 +242,8 @@ seastar::future<> Protocol::do_out_dispatch() conn, out_state, e); ceph_abort(); } - conn.socket->shutdown(); + ceph_assert_always(frame_assembler.has_socket()); + frame_assembler.shutdown_socket(); if (out_state == out_state_t::open) { logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}", conn, out_state, e); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 260634a2828..c352683dcb7 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -11,6 +11,7 @@ #include "crimson/common/log.h" #include "Fwd.h" #include "SocketConnection.h" +#include "FrameAssemblerV2.h" namespace crimson::net { @@ -141,13 +142,6 @@ class Protocol { in_seq = 0; } - bool is_out_queued() const { - return (!out_pending_msgs.empty() || - ack_left > 0 || - need_keepalive || - next_keepalive_ack.has_value()); - } - bool is_out_queued_or_sent() const { return is_out_queued() || !out_sent_msgs.empty(); } @@ -174,7 +168,16 @@ class Protocol { SocketConnection &conn; + FrameAssemblerV2 frame_assembler; + private: + bool is_out_queued() const { + return (!out_pending_msgs.empty() || + ack_left > 0 || + need_keepalive || + next_keepalive_ack.has_value()); + } + seastar::future try_exit_out_dispatch(); seastar::future<> do_out_dispatch(); diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 3b16818cc62..4f6e113bc80 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -99,6 +99,17 @@ inline uint64_t generate_client_cookie() { 1, std::numeric_limits::max()); } +std::size_t get_msg_size(const FrameAssembler &rx_frame_asm) +{ + ceph_assert(rx_frame_asm.get_num_segments() > 0); + size_t sum = 0; + // we don't include SegmentIndex::Msg::HEADER. + for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { + sum += rx_frame_asm.get_segment_logical_len(idx); + } + return sum; +} + } // namespace anonymous namespace fmt { @@ -182,7 +193,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, const entity_name_t& _peer_name) { ceph_assert(state == state_t::NONE); - ceph_assert(!conn.socket); + ceph_assert(!frame_assembler.has_socket()); ceph_assert(!gate.is_closed()); conn.peer_addr = _peer_addr; conn.target_addr = _peer_addr; @@ -203,10 +214,10 @@ void ProtocolV2::start_accept(SocketRef&& sock, const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::NONE); - ceph_assert(!conn.socket); + ceph_assert(!frame_assembler.has_socket()); // until we know better conn.target_addr = _peer_addr; - conn.socket = std::move(sock); + frame_assembler.set_socket(std::move(sock)); logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); messenger.accept_conn( seastar::static_pointer_cast(conn.shared_from_this())); @@ -215,148 +226,32 @@ void ProtocolV2::start_accept(SocketRef&& sock, // TODO: Frame related implementations, probably to a separate class. -void ProtocolV2::enable_recording() -{ - rxbuf.clear(); - txbuf.clear(); - record_io = true; -} - -seastar::future ProtocolV2::read_exactly(size_t bytes) -{ - if (unlikely(record_io)) { - return conn.socket->read_exactly(bytes - ).then([this] (auto bl) { - rxbuf.append(buffer::create(bl.share())); - return bl; - }); - } else { - return conn.socket->read_exactly(bytes); - }; -} - -seastar::future ProtocolV2::read(size_t bytes) -{ - if (unlikely(record_io)) { - return conn.socket->read(bytes - ).then([this] (auto buf) { - rxbuf.append(buf); - return buf; - }); - } else { - return conn.socket->read(bytes); - } -} - -seastar::future<> ProtocolV2::write(bufferlist&& buf) -{ - if (unlikely(record_io)) { - txbuf.append(buf); - } - return conn.socket->write(std::move(buf)); -} - -seastar::future<> ProtocolV2::write_flush(bufferlist&& buf) -{ - if (unlikely(record_io)) { - txbuf.append(buf); - } - return conn.socket->write_flush(std::move(buf)); -} - -size_t ProtocolV2::get_current_msg_size() const +seastar::future +ProtocolV2::read_main_preamble() { - ceph_assert(rx_frame_asm.get_num_segments() > 0); - size_t sum = 0; - // we don't include SegmentIndex::Msg::HEADER. - for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { - sum += rx_frame_asm.get_segment_logical_len(idx); - } - return sum; -} - -seastar::future ProtocolV2::read_main_preamble() -{ - rx_preamble.clear(); - return read_exactly(rx_frame_asm.get_preamble_onwire_len()) - .then([this] (auto bl) { - rx_segments_data.clear(); - try { - rx_preamble.append(buffer::create(std::move(bl))); - const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); - INTERCEPT_FRAME(tag, bp_type_t::READ); - return tag; - } catch (FrameError& e) { - logger().warn("{} read_main_preamble: {}", conn, e.what()); - abort_in_fault(); - } - }); + return frame_assembler.read_main_preamble( +#ifdef UNIT_TESTS_BUILT + ).then([this](auto ret) { + INTERCEPT_FRAME(ret.tag, bp_type_t::READ); + return ret; + }); +#else + ); +#endif } -seastar::future<> ProtocolV2::read_frame_payload() +template +ceph::bufferlist ProtocolV2::get_buffer(F &tx_frame) { - ceph_assert(rx_segments_data.empty()); - - return seastar::do_until( - [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); }, - [this] { - // TODO: create aligned and contiguous buffer from socket - const size_t seg_idx = rx_segments_data.size(); - if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx); - alignment != segment_t::DEFAULT_ALIGNMENT) { - logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}", - conn, alignment, rx_segments_data.size()); - } - uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); - // TODO: create aligned and contiguous buffer from socket - return read_exactly(onwire_len).then([this] (auto tmp_bl) { - logger().trace("{} RECV({}) frame segment[{}]", - conn, tmp_bl.size(), rx_segments_data.size()); - bufferlist segment; - segment.append(buffer::create(std::move(tmp_bl))); - rx_segments_data.emplace_back(std::move(segment)); - }); - } - ).then([this] { - return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); - }).then([this] (auto bl) { - logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); - bool ok = false; - try { - bufferlist rx_epilogue; - rx_epilogue.append(buffer::create(std::move(bl))); - ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue); - } catch (FrameError& e) { - logger().error("read_frame_payload: {} {}", conn, e.what()); - abort_in_fault(); - } catch (ceph::crypto::onwire::MsgAuthError&) { - logger().error("read_frame_payload: {} bad auth tag", conn); - abort_in_fault(); - } - // we do have a mechanism that allows transmitter to start sending message - // and abort after putting entire data field on wire. This will be used by - // the kernel client to avoid unnecessary buffering. - if (!ok) { - // TODO - ceph_assert(false); - } - }); + INTERCEPT_FRAME(F::tag, bp_type_t::WRITE); + return frame_assembler.get_buffer(tx_frame); } template -seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) +seastar::future<> ProtocolV2::write_flush_frame(F &tx_frame) { - auto bl = frame.get_buffer(tx_frame_asm); - const auto main_preamble = reinterpret_cast(bl.front().c_str()); - logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", - conn, bl.length(), (int)main_preamble->tag, - (int)main_preamble->num_segments, main_preamble->crc); - INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE); - if (flush) { - return write_flush(std::move(bl)); - } else { - return write(std::move(bl)); - } + INTERCEPT_FRAME(F::tag, bp_type_t::WRITE); + return frame_assembler.write_flush_frame(tx_frame); } void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant) @@ -428,11 +323,11 @@ ProtocolV2::banner_exchange(bool is_connect) CEPH_MSGR2_REQUIRED_FEATURES, CEPH_BANNER_V2_PREFIX); INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE); - return write_flush(std::move(bl)).then([this] { + return frame_assembler.write_flush(std::move(bl)).then([this] { // 2. read peer banner unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); - return read_exactly(banner_len); // or read exactly? + return frame_assembler.read_exactly(banner_len); // or read exactly? }).then([this] (auto bl) { // 3. process peer banner and read banner_payload unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); @@ -462,7 +357,7 @@ ProtocolV2::banner_exchange(bool is_connect) } logger().debug("{} GOT banner: payload_len={}", conn, payload_len); INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); - return read(payload_len); + return frame_assembler.read(payload_len); }).then([this, is_connect] (bufferlist bl) { // 4. process peer banner_payload and send HelloFrame auto p = bl.cbegin(); @@ -496,24 +391,23 @@ ProtocolV2::banner_exchange(bool is_connect) } peer_supported_features = _peer_supported_features; bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); - tx_frame_asm.set_is_rev1(is_rev1); - rx_frame_asm.set_is_rev1(is_rev1); + frame_assembler.set_is_rev1(is_rev1); auto hello = HelloFrame::Encode(messenger.get_mytype(), conn.target_addr); logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}", conn, ceph_entity_type_name(messenger.get_mytype()), conn.target_addr); - return write_frame(hello); + return write_flush_frame(hello); }).then([this] { //5. read peer HelloFrame return read_main_preamble(); - }).then([this] (Tag tag) { - expect_tag(Tag::HELLO, tag, conn, __func__); - return read_frame_payload(); - }).then([this] { + }).then([this](auto ret) { + expect_tag(Tag::HELLO, ret.tag, conn, __func__); + return frame_assembler.read_frame_payload(); + }).then([this](auto payload) { // 6. process peer HelloFrame - auto hello = HelloFrame::Decode(rx_segments_data.back()); + auto hello = HelloFrame::Decode(payload->back()); logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}", conn, ceph_entity_type_name(hello.entity_type()), hello.peer_addr()); @@ -526,13 +420,14 @@ ProtocolV2::banner_exchange(bool is_connect) seastar::future<> ProtocolV2::handle_auth_reply() { - return read_main_preamble() - .then([this] (Tag tag) { - switch (tag) { + return read_main_preamble( + ).then([this](auto ret) { + switch (ret.tag) { case Tag::AUTH_BAD_METHOD: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_auth_bad_method() logic - auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back()); + auto bad_method = AuthBadMethodFrame::Decode(payload->back()); logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, " "allowed_methods={}, allowed_modes={}", conn, bad_method.method(), cpp_strerror(bad_method.result()), @@ -550,9 +445,10 @@ seastar::future<> ProtocolV2::handle_auth_reply() return client_auth(bad_method.allowed_methods()); }); case Tag::AUTH_REPLY_MORE: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_auth_reply_more() logic - auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back()); + auto auth_more = AuthReplyMoreFrame::Decode(payload->back()); logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}", conn, auth_more.auth_payload().length()); ceph_assert(messenger.get_auth_client()); @@ -562,14 +458,15 @@ seastar::future<> ProtocolV2::handle_auth_reply() auto more_reply = AuthRequestMoreFrame::Encode(reply); logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}", conn, reply.length()); - return write_frame(more_reply); + return write_flush_frame(more_reply); }).then([this] { return handle_auth_reply(); }); case Tag::AUTH_DONE: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_auth_done() logic - auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back()); + auto auth_done = AuthDoneFrame::Decode(payload->back()); logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}", conn, auth_done.global_id(), ceph_con_mode_name(auth_done.con_mode()), @@ -586,13 +483,11 @@ seastar::future<> ProtocolV2::handle_auth_reply() abort_in_fault(); } auth_meta->con_mode = auth_done.con_mode(); - bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); - session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( - nullptr, *auth_meta, is_rev1, false); + frame_assembler.create_session_stream_handlers(*auth_meta, false); return finish_auth(); }); default: { - unexpected_tag(tag, conn, __func__); + unexpected_tag(ret.tag, conn, __func__); return seastar::now(); } } @@ -612,7 +507,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods logger().debug("{} WRITE AuthRequestFrame: method={}," " preferred_modes={}, payload_len={}", conn, auth_method, preferred_modes, bl.length()); - return write_frame(frame).then([this] { + return write_flush_frame(frame).then([this] { return handle_auth_reply(); }); } catch (const crimson::auth::error& e) { @@ -625,10 +520,11 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods seastar::future ProtocolV2::process_wait() { - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_wait() logic logger().debug("{} GOT WaitFrame", conn); - WaitFrame::Decode(rx_segments_data.back()); + WaitFrame::Decode(payload->back()); return next_step_t::wait; }); } @@ -659,14 +555,15 @@ ProtocolV2::client_connect() conn.policy.features_supported, conn.policy.features_required | msgr2_required, flags, client_cookie); - return write_frame(client_ident).then([this] { + return write_flush_frame(client_ident).then([this] { return read_main_preamble(); - }).then([this] (Tag tag) { - switch (tag) { + }).then([this](auto ret) { + switch (ret.tag) { case Tag::IDENT_MISSING_FEATURES: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_ident_missing_features() logic - auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back()); + auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back()); logger().warn("{} GOT IdentMissingFeaturesFrame: features={}" " (client does not support all server features)", conn, ident_missing.features()); @@ -676,10 +573,11 @@ ProtocolV2::client_connect() case Tag::WAIT: return process_wait(); case Tag::SERVER_IDENT: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_server_ident() logic requeue_out_sent(); - auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back()); + auto server_ident = ServerIdentFrame::Decode(payload->back()); logger().debug("{} GOT ServerIdentFrame:" " addrs={}, gid={}, gs={}," " features_supported={}, features_required={}," @@ -738,7 +636,7 @@ ProtocolV2::client_connect() return seastar::make_ready_future(next_step_t::ready); }); default: { - unexpected_tag(tag, conn, "post_client_connect"); + unexpected_tag(ret.tag, conn, "post_client_connect"); return seastar::make_ready_future(next_step_t::none); } } @@ -760,14 +658,15 @@ ProtocolV2::client_reconnect() conn, messenger.get_myaddrs(), client_cookie, server_cookie, global_seq, connect_seq, get_in_seq()); - return write_frame(reconnect).then([this] { + return write_flush_frame(reconnect).then([this] { return read_main_preamble(); - }).then([this] (Tag tag) { - switch (tag) { + }).then([this](auto ret) { + switch (ret.tag) { case Tag::SESSION_RETRY_GLOBAL: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_session_retry_global() logic - auto retry = RetryGlobalFrame::Decode(rx_segments_data.back()); + auto retry = RetryGlobalFrame::Decode(payload->back()); logger().warn("{} GOT RetryGlobalFrame: gs={}", conn, retry.global_seq()); global_seq = messenger.get_global_seq(retry.global_seq()); @@ -775,9 +674,10 @@ ProtocolV2::client_reconnect() return client_reconnect(); }); case Tag::SESSION_RETRY: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_session_retry() logic - auto retry = RetryFrame::Decode(rx_segments_data.back()); + auto retry = RetryFrame::Decode(payload->back()); logger().warn("{} GOT RetryFrame: cs={}", conn, retry.connect_seq()); connect_seq = retry.connect_seq() + 1; @@ -785,9 +685,10 @@ ProtocolV2::client_reconnect() return client_reconnect(); }); case Tag::SESSION_RESET: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_session_reset() logic - auto reset = ResetFrame::Decode(rx_segments_data.back()); + auto reset = ResetFrame::Decode(payload->back()); logger().warn("{} GOT ResetFrame: full={}", conn, reset.full()); reset_session(reset.full()); return client_connect(); @@ -795,16 +696,17 @@ ProtocolV2::client_reconnect() case Tag::WAIT: return process_wait(); case Tag::SESSION_RECONNECT_OK: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_reconnect_ok() logic - auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back()); + auto reconnect_ok = ReconnectOkFrame::Decode(payload->back()); logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", conn, reconnect_ok.msg_seq()); requeue_out_sent_up_to(reconnect_ok.msg_seq()); return seastar::make_ready_future(next_step_t::ready); }); default: { - unexpected_tag(tag, conn, "post_client_reconnect"); + unexpected_tag(ret.tag, conn, "post_client_reconnect"); return seastar::make_ready_future(next_step_t::none); } } @@ -814,9 +716,7 @@ ProtocolV2::client_reconnect() void ProtocolV2::execute_connecting() { trigger_state(state_t::CONNECTING, out_state_t::delay, false); - if (conn.socket) { - conn.socket->shutdown(); - } + frame_assembler.shutdown_socket(); gated_execute("execute_connecting", [this] { global_seq = messenger.get_global_seq(); assert(client_cookie != 0); @@ -835,12 +735,10 @@ void ProtocolV2::execute_connecting() conn, get_state_name(state)); abort_protocol(); } - if (conn.socket) { - gate.dispatch_in_background("close_sockect_connecting", *this, - [sock = std::move(conn.socket)] () mutable { - return sock->close().then([sock = std::move(sock)] {}); - }); - } + gate.dispatch_in_background( + "reset_close_socket_connecting", + *this, + [this] { return frame_assembler.reset_and_close_socket(); }); INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING); return Socket::connect(conn.peer_addr); }).then([this](SocketRef sock) { @@ -852,12 +750,12 @@ void ProtocolV2::execute_connecting() abort_protocol(); }); } - conn.socket = std::move(sock); + frame_assembler.set_socket(std::move(sock)); return seastar::now(); }).then([this] { auth_meta = seastar::make_lw_shared(); - session_stream_handlers = { nullptr, nullptr }; - enable_recording(); + frame_assembler.reset_handlers(); + frame_assembler.start_recording(); return banner_exchange(true); }).then([this] (auto&& ret) { auto [_peer_type, _my_addr_from_peer] = std::move(ret); @@ -872,7 +770,8 @@ void ProtocolV2::execute_connecting() conn, get_state_name(state)); abort_protocol(); } - conn.socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port()); + frame_assembler.learn_socket_ephemeral_port_as_connector( + _my_addr_from_peer.get_port()); if (unlikely(_my_addr_from_peer.is_legacy())) { logger().warn("{} peer sent a legacy address for me: {}", conn, _my_addr_from_peer); @@ -953,7 +852,7 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r) "allowed_methods={}, allowed_modes={})", conn, auth_meta->auth_method, cpp_strerror(r), allowed_methods, allowed_modes); - return write_frame(bad_method).then([this] { + return write_flush_frame(bad_method).then([this] { return server_auth(); }); } @@ -979,11 +878,9 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}", conn, conn.peer_global_id, ceph_con_mode_name(auth_meta->con_mode), reply.length()); - return write_frame(auth_done).then([this] { + return write_flush_frame(auth_done).then([this] { ceph_assert(auth_meta); - bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); - session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( - nullptr, *auth_meta, is_rev1, true); + frame_assembler.create_session_stream_handlers(*auth_meta, true); return finish_auth(); }); } @@ -992,13 +889,13 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo auto more = AuthReplyMoreFrame::Encode(reply); logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}", conn, reply.length()); - return write_frame(more).then([this] { + return write_flush_frame(more).then([this] { return read_main_preamble(); - }).then([this] (Tag tag) { - expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__); - return read_frame_payload(); - }).then([this] { - auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back()); + }).then([this](auto ret) { + expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__); + return frame_assembler.read_frame_payload(); + }).then([this](auto payload) { + auto auth_more = AuthRequestMoreFrame::Decode(payload->back()); logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}", conn, auth_more.auth_payload().length()); return _handle_auth_request(auth_more.auth_payload(), true); @@ -1018,13 +915,13 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo seastar::future<> ProtocolV2::server_auth() { - return read_main_preamble() - .then([this] (Tag tag) { - expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__); - return read_frame_payload(); - }).then([this] { + return read_main_preamble( + ).then([this](auto ret) { + expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__); + return frame_assembler.read_frame_payload(); + }).then([this](auto payload) { // handle_auth_request() logic - auto request = AuthRequestFrame::Decode(rx_segments_data.back()); + auto request = AuthRequestFrame::Decode(payload->back()); logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={}," " payload_len={}", conn, request.method(), request.preferred_modes(), @@ -1060,7 +957,7 @@ ProtocolV2::send_wait() { auto wait = WaitFrame::Encode(); logger().debug("{} WRITE WaitFrame", conn); - return write_frame(wait).then([] { + return write_flush_frame(wait).then([] { return next_step_t::wait; }); } @@ -1072,9 +969,8 @@ ProtocolV2::reuse_connection( { existing_proto->trigger_replacing(reconnect, do_reset, - std::move(conn.socket), + frame_assembler.to_replace(), std::move(auth_meta), - std::move(session_stream_handlers), peer_global_seq, client_cookie, conn.get_peer_name(), @@ -1194,9 +1090,10 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) seastar::future ProtocolV2::server_connect() { - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_client_ident() logic - auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back()); + auto client_ident = ClientIdentFrame::Decode(payload->back()); logger().debug("{} GOT ClientIdentFrame: addrs={}, target={}," " gid={}, gs={}, features_supported={}," " features_required={}, flags={}, cookie={}", @@ -1245,7 +1142,7 @@ ProtocolV2::server_connect() auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing); logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", conn, feat_missing); - return write_frame(ident_missing_features).then([] { + return write_flush_frame(ident_missing_features).then([] { return next_step_t::wait; }); } @@ -1278,9 +1175,9 @@ ProtocolV2::server_connect() seastar::future ProtocolV2::read_reconnect() { - return read_main_preamble() - .then([this] (Tag tag) { - expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect"); + return read_main_preamble( + ).then([this](auto ret) { + expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect"); return server_reconnect(); }); } @@ -1290,7 +1187,7 @@ ProtocolV2::send_retry(uint64_t connect_seq) { auto retry = RetryFrame::Encode(connect_seq); logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); - return write_frame(retry).then([this] { + return write_flush_frame(retry).then([this] { return read_reconnect(); }); } @@ -1300,7 +1197,7 @@ ProtocolV2::send_retry_global(uint64_t global_seq) { auto retry = RetryGlobalFrame::Encode(global_seq); logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); - return write_frame(retry).then([this] { + return write_flush_frame(retry).then([this] { return read_reconnect(); }); } @@ -1310,10 +1207,10 @@ ProtocolV2::send_reset(bool full) { auto reset = ResetFrame::Encode(full); logger().warn("{} WRITE ResetFrame: full={}", conn, full); - return write_frame(reset).then([this] { + return write_flush_frame(reset).then([this] { return read_main_preamble(); - }).then([this] (Tag tag) { - expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset"); + }).then([this](auto ret) { + expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset"); return server_connect(); }); } @@ -1321,9 +1218,10 @@ ProtocolV2::send_reset(bool full) seastar::future ProtocolV2::server_reconnect() { - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_reconnect() logic - auto reconnect = ReconnectFrame::Decode(rx_segments_data.back()); + auto reconnect = ReconnectFrame::Decode(payload->back()); logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={}," " server_cookie={}, gs={}, cs={}, msg_seq={}", @@ -1462,9 +1360,8 @@ void ProtocolV2::execute_accepting() return seastar::futurize_invoke([this] { INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared(); - session_stream_handlers = { nullptr, nullptr }; - session_comp_handlers = { nullptr, nullptr }; - enable_recording(); + frame_assembler.reset_handlers(); + frame_assembler.start_recording(); return banner_exchange(false); }).then([this] (auto&& ret) { auto [_peer_type, _my_addr_from_peer] = std::move(ret); @@ -1489,14 +1386,14 @@ void ProtocolV2::execute_accepting() return server_auth(); }).then([this] { return read_main_preamble(); - }).then([this] (Tag tag) { - switch (tag) { + }).then([this](auto ret) { + switch (ret.tag) { case Tag::CLIENT_IDENT: return server_connect(); case Tag::SESSION_RECONNECT: return server_reconnect(); default: { - unexpected_tag(tag, conn, "post_server_auth"); + unexpected_tag(ret.tag, conn, "post_server_auth"); return seastar::make_ready_future(next_step_t::none); } } @@ -1531,21 +1428,19 @@ seastar::future<> ProtocolV2::finish_auth() { ceph_assert(auth_meta); + auto records = frame_assembler.stop_recording(); const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() : - auth_meta->session_key.hmac_sha256(nullptr, rxbuf); + auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf); auto sig_frame = AuthSignatureFrame::Encode(sig); - ceph_assert(record_io); - record_io = false; - rxbuf.clear(); logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig); - return write_frame(sig_frame).then([this] { + return write_flush_frame(sig_frame).then([this] { return read_main_preamble(); - }).then([this] (Tag tag) { - expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth"); - return read_frame_payload(); - }).then([this] { + }).then([this](auto ret) { + expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth"); + return frame_assembler.read_frame_payload(); + }).then([this, txbuf=std::move(records.txbuf)](auto payload) { // handle_auth_signature() logic - auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back()); + auto sig_frame = AuthSignatureFrame::Decode(payload->back()); logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature()); const auto actual_tx_sig = auth_meta->session_key.empty() ? @@ -1556,7 +1451,6 @@ seastar::future<> ProtocolV2::finish_auth() conn, actual_tx_sig, sig_frame.signature()); abort_in_fault(); } - txbuf.clear(); }); } @@ -1664,16 +1558,15 @@ ProtocolV2::send_server_ident() conn.policy.features_required | msgr2_required, flags, server_cookie); - return write_frame(server_ident); + return write_flush_frame(server_ident); } // REPLACING state void ProtocolV2::trigger_replacing(bool reconnect, bool do_reset, - SocketRef&& new_socket, + FrameAssemblerV2::mover_t &&mover, AuthConnectionMetaRef&& new_auth_meta, - ceph::crypto::onwire::rxtx_t new_rxtx, uint64_t new_peer_global_seq, uint64_t new_client_cookie, entity_name_t new_peer_name, @@ -1683,18 +1576,15 @@ void ProtocolV2::trigger_replacing(bool reconnect, uint64_t new_msg_seq) { trigger_state(state_t::REPLACING, out_state_t::delay, false); - if (conn.socket) { - conn.socket->shutdown(); - } + frame_assembler.shutdown_socket(); dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); gate.dispatch_in_background("trigger_replacing", *this, [this, reconnect, do_reset, - new_socket = std::move(new_socket), + mover = std::move(mover), new_auth_meta = std::move(new_auth_meta), - new_rxtx = std::move(new_rxtx), new_client_cookie, new_peer_name, new_conn_features, new_peer_supported_features, new_peer_global_seq, @@ -1707,30 +1597,26 @@ void ProtocolV2::trigger_replacing(bool reconnect, return execution_done.get_future(); }).then([this, reconnect, - new_socket = std::move(new_socket), + mover = std::move(mover), new_auth_meta = std::move(new_auth_meta), - new_rxtx = std::move(new_rxtx), new_client_cookie, new_peer_name, new_conn_features, new_peer_supported_features, new_peer_global_seq, new_connect_seq, new_msg_seq] () mutable { if (unlikely(state != state_t::REPLACING)) { - return new_socket->close().then([sock = std::move(new_socket)] { + return mover.socket->close( + ).then([sock = std::move(mover.socket)] { abort_protocol(); }); } - if (conn.socket) { - gate.dispatch_in_background("close_socket_replacing", *this, - [sock = std::move(conn.socket)] () mutable { - return sock->close().then([sock = std::move(sock)] {}); - }); - } - conn.socket = std::move(new_socket); + gate.dispatch_in_background( + "reset_close_socket_replacing", + *this, + [this] { return frame_assembler.reset_and_close_socket(); }); auth_meta = std::move(new_auth_meta); - session_stream_handlers = std::move(new_rxtx); - record_io = false; peer_global_seq = new_peer_global_seq; + frame_assembler.replace_by(std::move(mover)); if (reconnect) { connect_seq = new_connect_seq; @@ -1738,7 +1624,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, requeue_out_sent_up_to(new_msg_seq); auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq()); logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq()); - return write_frame(reconnect_ok); + return write_flush_frame(reconnect_ok); } else { client_cookie = new_client_cookie; assert(conn.get_peer_type() == new_peer_name.type()); @@ -1748,8 +1634,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, conn.set_features(new_conn_features); peer_supported_features = new_peer_supported_features; bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1); - tx_frame_asm.set_is_rev1(is_rev1); - rx_frame_asm.set_is_rev1(is_rev1); + frame_assembler.set_is_rev1(is_rev1); return send_server_ident(); } }).then([this, reconnect] { @@ -1790,20 +1675,17 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( if (unlikely(require_keepalive)) { auto keepalive_frame = KeepAliveFrame::Encode(); - bl.append(keepalive_frame.get_buffer(tx_frame_asm)); - INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE); + bl.append(get_buffer(keepalive_frame)); } if (unlikely(maybe_keepalive_ack.has_value())) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); - bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm)); - INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE); + bl.append(get_buffer(keepalive_ack_frame)); } if (require_ack && num_msgs == 0u) { auto ack_frame = AckFrame::Encode(get_in_seq()); - bl.append(ack_frame.get_buffer(tx_frame_asm)); - INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); + bl.append(get_buffer(ack_frame)); } std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) { @@ -1831,22 +1713,20 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( msg->get_payload(), msg->get_middle(), msg->get_data()); logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); - bl.append(message.get_buffer(tx_frame_asm)); - INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE); + bl.append(get_buffer(message)); }); return bl; } -seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) +seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size) { - return read_frame_payload() - .then([this, throttle_stamp] { + return frame_assembler.read_frame_payload( + ).then([this, throttle_stamp, msg_size](auto payload) { utime_t recv_stamp{seastar::lowres_system_clock::now()}; // we need to get the size before std::moving segments data - const size_t cur_msg_size = get_current_msg_size(); - auto msg_frame = MessageFrame::Decode(rx_segments_data); + auto msg_frame = MessageFrame::Decode(*payload); // XXX: paranoid copy just to avoid oops ceph_msg_header2 current_header = msg_frame.header(); @@ -1883,7 +1763,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. - message->set_dispatch_throttle_size(cur_msg_size); + message->set_dispatch_throttle_size(msg_size); message->set_throttle_stamp(throttle_stamp); message->set_recv_stamp(recv_stamp); @@ -1941,10 +1821,11 @@ void ProtocolV2::execute_ready(bool dispatch_connect) gated_execute("execute_ready", [this] { protocol_timer.cancel(); return seastar::keep_doing([this] { - return read_main_preamble() - .then([this] (Tag tag) { - switch (tag) { + return read_main_preamble( + ).then([this](auto ret) { + switch (ret.tag) { case Tag::MESSAGE: { + size_t msg_size = get_msg_size(*ret.rx_frame_asm); return seastar::futurize_invoke([this] { // throttle_message() logic if (!conn.policy.throttler_messages) { @@ -1953,46 +1834,48 @@ void ProtocolV2::execute_ready(bool dispatch_connect) // TODO: message throttler ceph_assert(false); return seastar::now(); - }).then([this] { + }).then([this, msg_size] { // throttle_bytes() logic if (!conn.policy.throttler_bytes) { return seastar::now(); } - size_t cur_msg_size = get_current_msg_size(); - if (!cur_msg_size) { + if (!msg_size) { return seastar::now(); } logger().trace("{} wants {} bytes from policy throttler {}/{}", - conn, cur_msg_size, + conn, msg_size, conn.policy.throttler_bytes->get_current(), conn.policy.throttler_bytes->get_max()); - return conn.policy.throttler_bytes->get(cur_msg_size); - }).then([this] { + return conn.policy.throttler_bytes->get(msg_size); + }).then([this, msg_size] { // TODO: throttle_dispatch_queue() logic utime_t throttle_stamp{seastar::lowres_system_clock::now()}; - return read_message(throttle_stamp); + return read_message(throttle_stamp, msg_size); }); } case Tag::ACK: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_message_ack() logic - auto ack = AckFrame::Decode(rx_segments_data.back()); + auto ack = AckFrame::Decode(payload->back()); logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); ack_out_sent(ack.seq()); }); case Tag::KEEPALIVE2: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_keepalive2() logic - auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back()); + auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); logger().debug("{} GOT KeepAliveFrame: timestamp={}", conn, keepalive_frame.timestamp()); notify_keepalive_ack(keepalive_frame.timestamp()); set_last_keepalive(seastar::lowres_system_clock::now()); }); case Tag::KEEPALIVE2_ACK: - return read_frame_payload().then([this] { + return frame_assembler.read_frame_payload( + ).then([this](auto payload) { // handle_keepalive2_ack() logic - auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back()); + auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back()); auto _last_keepalive_ack = seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}; set_last_keepalive_ack(_last_keepalive_ack); @@ -2000,7 +1883,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect) conn, _last_keepalive_ack); }); default: { - unexpected_tag(tag, conn, "execute_ready"); + unexpected_tag(ret.tag, conn, "execute_ready"); return seastar::now(); } } @@ -2023,9 +1906,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect) void ProtocolV2::execute_standby() { trigger_state(state_t::STANDBY, out_state_t::delay, false); - if (conn.socket) { - conn.socket->shutdown(); - } + frame_assembler.shutdown_socket(); } void ProtocolV2::notify_out() @@ -2042,9 +1923,7 @@ void ProtocolV2::notify_out() void ProtocolV2::execute_wait(bool max_backoff) { trigger_state(state_t::WAIT, out_state_t::delay, false); - if (conn.socket) { - conn.socket->shutdown(); - } + frame_assembler.shutdown_socket(); gated_execute("execute_wait", [this, max_backoff] { double backoff = protocol_timer.last_dur(); if (max_backoff) { @@ -2077,7 +1956,7 @@ void ProtocolV2::execute_server_wait() { trigger_state(state_t::SERVER_WAIT, out_state_t::none, false); gated_execute("execute_server_wait", [this] { - return read_exactly(1).then([this] (auto bl) { + return frame_assembler.read_exactly(1).then([this] (auto bl) { logger().warn("{} SERVER_WAIT got read, abort", conn); abort_in_fault(); }).handle_exception([this] (std::exception_ptr eptr) { @@ -2154,9 +2033,7 @@ void ProtocolV2::do_close( if (f_accept_new) { (*f_accept_new)(); } - if (conn.socket) { - conn.socket->shutdown(); - } + frame_assembler.shutdown_socket(); assert(!gate.is_closed()); auto gate_closed = gate.close(); auto out_closed = close_out(); @@ -2172,11 +2049,7 @@ void ProtocolV2::do_close( closed_clean_fut = seastar::when_all( std::move(gate_closed), std::move(out_closed) ).discard_result().then([this] { - if (conn.socket) { - return conn.socket->close(); - } else { - return seastar::now(); - } + return frame_assembler.reset_and_close_socket(false); }).then([this] { logger().debug("{} closed!", conn); messenger.closed_conn( diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 47584e702a8..3c2b5e67434 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -6,9 +6,6 @@ #include #include "Protocol.h" -#include "msg/async/frames_v2.h" -#include "msg/async/crypto_onwire.h" -#include "msg/async/compression_onwire.h" namespace crimson::net { @@ -139,37 +136,17 @@ class ProtocolV2 final : public Protocol { }; Timer protocol_timer; - // TODO: Frame related implementations, probably to a separate class. private: - bool record_io = false; - ceph::bufferlist rxbuf; - ceph::bufferlist txbuf; - - void enable_recording(); - seastar::future read_exactly(size_t bytes); - seastar::future read(size_t bytes); - seastar::future<> write(bufferlist&& buf); - seastar::future<> write_flush(bufferlist&& buf); - - ceph::crypto::onwire::rxtx_t session_stream_handlers; - ceph::compression::onwire::rxtx_t session_comp_handlers; - ceph::msgr::v2::FrameAssembler tx_frame_asm{ - &session_stream_handlers, false, common::local_conf()->ms_crc_data, - &session_comp_handlers}; - ceph::msgr::v2::FrameAssembler rx_frame_asm{ - &session_stream_handlers, false, common::local_conf()->ms_crc_data, - &session_comp_handlers}; - ceph::bufferlist rx_preamble; - ceph::msgr::v2::segment_bls_t rx_segments_data; - - size_t get_current_msg_size() const; - seastar::future read_main_preamble(); - seastar::future<> read_frame_payload(); + seastar::future read_main_preamble(); + template - seastar::future<> write_frame(F &frame, bool flush=true); + ceph::bufferlist get_buffer(F &tx_frame); + + template + seastar::future<> write_flush_frame(F &tx_frame); - private: void fault(bool backoff, const char* func_name, std::exception_ptr eptr); + void reset_session(bool is_full); seastar::future> banner_exchange(bool is_connect); @@ -229,9 +206,8 @@ class ProtocolV2 final : public Protocol { // REPLACING (server) void trigger_replacing(bool reconnect, bool do_reset, - SocketRef&& new_socket, + FrameAssemblerV2::mover_t &&mover, AuthConnectionMetaRef&& new_auth_meta, - ceph::crypto::onwire::rxtx_t new_rxtx, uint64_t new_peer_global_seq, // !reconnect uint64_t new_client_cookie, @@ -243,7 +219,7 @@ class ProtocolV2 final : public Protocol { uint64_t new_msg_seq); // READY - seastar::future<> read_message(utime_t throttle_stamp); + seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size); void execute_ready(bool dispatch_connect); // STANDBY diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 76703c76337..94d98302bdf 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -180,6 +180,7 @@ private: friend class Protocol; friend class ProtocolV2; + friend class FrameAssemblerV2; }; } // namespace crimson::net