From: Kefu Chai Date: Fri, 24 Jul 2020 08:33:22 +0000 (+0800) Subject: crimson/net: use rx_frame_asm for handling data read from wire X-Git-Tag: wip-pdonnell-testing-20200918.022351~556^2~15 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f5938cb7f0f7bdf721af596c0bf42bb742eb25f2;p=ceph-ci.git crimson/net: use rx_frame_asm for handling data read from wire by leveraging FrameAssembler, it's much simpler. and it also pave the road to a better messenger v2.0 and v2.1 protocol support. Signed-off-by: Kefu Chai --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 448e5340768..9397d70a4b2 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -150,8 +150,7 @@ ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher, SocketMessenger& messenger) : Protocol(proto_t::v2, dispatcher, conn), messenger{messenger}, - protocol_timer{conn}, - tx_frame_asm(&session_stream_handlers, false) + protocol_timer{conn} {} ProtocolV2::~ProtocolV2() {} @@ -249,11 +248,11 @@ seastar::future<> ProtocolV2::write_flush(bufferlist&& buf) size_t ProtocolV2::get_current_msg_size() const { - ceph_assert(!rx_segments_desc.empty()); + ceph_assert(rx_frame_asm.get_num_segments() > 0); size_t sum = 0; // we don't include SegmentIndex::Msg::HEADER. - for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) { - sum += rx_segments_desc[idx].length; + for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { + sum += rx_frame_asm.get_segment_logical_len(idx); } return sum; } @@ -262,78 +261,37 @@ seastar::future ProtocolV2::read_main_preamble() { return read_exactly(sizeof(preamble_block_t)) .then([this] (auto bl) { - if (session_stream_handlers.rx) { - session_stream_handlers.rx->reset_rx_handler(); - /* - bl = session_stream_handlers.rx->authenticated_decrypt_update( - std::move(bl), segment_t::DEFAULT_ALIGNMENT); - */ - } - - // I expect ceph_le32 will make the endian conversion for me. Passing - // everything through ::Decode is unnecessary. - const auto& main_preamble = \ - *reinterpret_cast(bl.get()); - logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}", - conn, bl.size(), (int)main_preamble.tag, - (int)main_preamble.num_segments, main_preamble.crc); - - // verify preamble's CRC before any further processing - const auto rx_crc = ceph_crc32c(0, - reinterpret_cast(&main_preamble), - sizeof(main_preamble) - sizeof(main_preamble.crc)); - if (rx_crc != main_preamble.crc) { - logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}", - conn, rx_crc, main_preamble.crc); - abort_in_fault(); - } - - // currently we do support between 1 and MAX_NUM_SEGMENTS segments - if (main_preamble.num_segments < 1 || - main_preamble.num_segments > MAX_NUM_SEGMENTS) { - logger().warn("{} unsupported num_segments={}", - conn, main_preamble.num_segments); - abort_in_fault(); - } - if (main_preamble.num_segments > MAX_NUM_SEGMENTS) { - logger().warn("{} num_segments too much: {}", - conn, main_preamble.num_segments); - abort_in_fault(); - } - - rx_segments_desc.clear(); rx_segments_data.clear(); - - for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) { - logger().trace("{} GOT frame segment: len={} align={}", - conn, main_preamble.segments[idx].length, - main_preamble.segments[idx].alignment); - rx_segments_desc.emplace_back(main_preamble.segments[idx]); + try { + bufferlist preamble; + preamble.append(buffer::create(std::move(bl))); + const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); + INTERCEPT_FRAME(tag, bp_type_t::READ); + return tag; + } catch (FrameError& e) { + logger().warn("{} read_main_preamble: {}", conn, e.what()); + abort_in_fault(); } - - INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ); - return static_cast(main_preamble.tag); }); } seastar::future<> ProtocolV2::read_frame_payload() { - ceph_assert(!rx_segments_desc.empty()); ceph_assert(rx_segments_data.empty()); return seastar::do_until( - [this] { return rx_segments_desc.size() == rx_segments_data.size(); }, + [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); }, [this] { - // description of current segment to read - const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size()); // TODO: create aligned and contiguous buffer from socket - if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) { + const size_t seg_idx = rx_segments_data.size(); + if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx); + alignment != segment_t::DEFAULT_ALIGNMENT) { logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}", - conn, cur_rx_desc.alignment, rx_segments_data.size()); + conn, alignment, rx_segments_data.size()); } + uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); // TODO: create aligned and contiguous buffer from socket - return read_exactly(cur_rx_desc.length) - .then([this] (auto tmp_bl) { + return read_exactly(onwire_len).then([this] (auto tmp_bl) { logger().trace("{} RECV({}) frame segment[{}]", conn, tmp_bl.size(), rx_segments_data.size()); bufferlist data; @@ -346,40 +304,27 @@ seastar::future<> ProtocolV2::read_frame_payload() }); } ).then([this] { - // TODO: get_epilogue_size() ceph_assert(!session_stream_handlers.rx); - return read_exactly(sizeof(epilogue_crc_rev0_block_t)); + return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); }).then([this] (auto bl) { logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); - - __u8 late_flags; - if (session_stream_handlers.rx) { - // TODO - ceph_assert(false); - } else { - auto& epilogue = *reinterpret_cast(bl.get()); - for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) { - const __u32 expected_crc = epilogue.crc_values[idx]; - const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1); - if (expected_crc != calculated_crc) { - logger().warn("{} message integrity check failed at index {}:" - " expected_crc={} calculated_crc={}", - conn, (unsigned int)idx, expected_crc, calculated_crc); - abort_in_fault(); - } else { - logger().trace("{} message integrity check success at index {}: crc={}", - conn, (unsigned int)idx, expected_crc); - } - } - late_flags = epilogue.late_flags; + bool ok = false; + try { + // TODO: v2.1 rx_frame_asm.disassemble_first_segment(); + bufferlist rx_epilogue; + rx_epilogue.append(buffer::create(std::move(bl))); + ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue); + } catch (FrameError& e) { + logger().error("read_frame_payload: {} {}", conn, e.what()); + abort_in_fault(); + } catch (ceph::crypto::onwire::MsgAuthError&) { + logger().error("read_frame_payload: {} bad auth tag", conn); + abort_in_fault(); } - logger().trace("{} GOT frame epilogue: late_flags={}", - conn, (unsigned)late_flags); - // we do have a mechanism that allows transmitter to start sending message // and abort after putting entire data field on wire. This will be used by // the kernel client to avoid unnecessary buffering. - if (late_flags & FRAME_LATE_FLAG_ABORTED) { + if (!ok) { // TODO ceph_assert(false); } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 1630bd66f87..ea0f3261192 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -122,9 +122,8 @@ class ProtocolV2 final : public Protocol { seastar::future<> write_flush(bufferlist&& buf); ceph::crypto::onwire::rxtx_t session_stream_handlers; - boost::container::static_vector rx_segments_desc; - ceph::msgr::v2::FrameAssembler tx_frame_asm; + ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false}; + ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false}; ceph::msgr::v2::segment_bls_t rx_segments_data; size_t get_current_msg_size() const;