From 95dea12a49c018ffa6b36899d35d7cd79fad808d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 24 Apr 2020 18:01:46 +0200 Subject: [PATCH] msg/async/frames_v2: introduce FrameAssembler Start separating frame assembly and disassembly code from frame sending, receiving and handling code, so that assembly and disassembly pieces can be unit tested and hopefully also shared between different messengers (e.g. crimson). This commit factors out the assembly code from Frame. Signed-off-by: Ilya Dryomov (cherry picked from commit 872b125a5b817def84ed5d70d204fec5a4fa7c1e) Conflicts: src/crimson/CMakeLists.txt [ crimson doesn't support msgr2 in nautilus src/crimson/net/ProtocolV2.cc [ ditto ] src/crimson/net/ProtocolV2.h [ ditto ] src/msg/async/frames_v2.h [ commits f1cf408412c8 ("msg/async/frames_v2.h: fix warning"), c70f779d12a2 ("headers: Make ceph_le member private"), 9908f0e652b9 ("msg: Add optimizing move") and 1a975fb3a801 ("msg/async: fix unnecessary 4 kB allocation in secure mode.") not in nautilus; fmt include adjusted as in rgw_lc.cc ] --- src/msg/CMakeLists.txt | 1 + src/msg/async/ProtocolV2.cc | 11 +- src/msg/async/ProtocolV2.h | 1 + src/msg/async/frames_v2.cc | 169 +++++++++++++++++++++++++++++ src/msg/async/frames_v2.h | 211 ++++++++++++++---------------------- 5 files changed, 261 insertions(+), 132 deletions(-) create mode 100644 src/msg/async/frames_v2.cc diff --git a/src/msg/CMakeLists.txt b/src/msg/CMakeLists.txt index 647bbb80e8361..1ad346153655f 100644 --- a/src/msg/CMakeLists.txt +++ b/src/msg/CMakeLists.txt @@ -29,6 +29,7 @@ list(APPEND msg_srcs async/PosixStack.cc async/Stack.cc async/crypto_onwire.cc + async/frames_v2.cc async/net_handler.cc) if(LINUX) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 378147b23ff85..5cb73678459b2 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -92,6 +92,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) replacing(false), can_write(false), bannerExchangeCallback(nullptr), + tx_frame_asm(&session_stream_handlers), next_tag(static_cast(0)), keepalive(false) { } @@ -573,11 +574,14 @@ template bool ProtocolV2::append_frame(F& frame) { ceph::bufferlist bl; try { - bl = frame.get_buffer(session_stream_handlers); + bl = frame.get_buffer(tx_frame_asm); } catch (ceph::crypto::onwire::TxHandlerError &e) { ldout(cct, 1) << __func__ << " " << e.what() << dendl; return false; } + + ldout(cct, 25) << __func__ << " assembled frame " << bl.length() + << " bytes " << tx_frame_asm << dendl; connection->outgoing_bl.append(bl); return true; } @@ -778,11 +782,14 @@ CtPtr ProtocolV2::write(const std::string &desc, F &frame) { ceph::bufferlist bl; try { - bl = frame.get_buffer(session_stream_handlers); + bl = frame.get_buffer(tx_frame_asm); } catch (ceph::crypto::onwire::TxHandlerError &e) { ldout(cct, 1) << __func__ << " " << e.what() << dendl; return _fault(); } + + ldout(cct, 25) << __func__ << " assembled frame " << bl.length() + << " bytes " << tx_frame_asm << dendl; return write(desc, next, bl); } diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index a4c2b1f5b8fe3..3cd2577a89d08 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -98,6 +98,7 @@ private: boost::container::static_vector rx_segments_desc; + ceph::msgr::v2::FrameAssembler tx_frame_asm; ceph::msgr::v2::segment_bls_t rx_segments_data; ceph::msgr::v2::Tag next_tag; utime_t backoff; // backoff time diff --git a/src/msg/async/frames_v2.cc b/src/msg/async/frames_v2.cc new file mode 100644 index 0000000000000..99b79e83cd675 --- /dev/null +++ b/src/msg/async/frames_v2.cc @@ -0,0 +1,169 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "frames_v2.h" + +#include + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include "seastar/fmt/include/fmt/format.h" + +namespace ceph::msgr::v2 { + +// Discards trailing empty segments, unless there is just one segment. +// A frame always has at least one (possibly empty) segment. +static size_t calc_num_segments(const bufferlist segment_bls[], + size_t segment_count) { + ceph_assert(segment_count > 0 && segment_count <= MAX_NUM_SEGMENTS); + for (size_t i = segment_count; i-- > 0; ) { + if (segment_bls[i].length() > 0) { + return i + 1; + } + } + return 1; +} + +void FrameAssembler::fill_preamble(Tag tag, + preamble_block_t& preamble) const { + // FIPS zeroization audit 20191115: this memset is not security related. + ::memset(&preamble, 0, sizeof(preamble)); + + preamble.tag = static_cast<__u8>(tag); + for (size_t i = 0; i < m_descs.size(); i++) { + preamble.segments[i].length = m_descs[i].logical_len; + preamble.segments[i].alignment = m_descs[i].align; + } + preamble.num_segments = m_descs.size(); + preamble.crc = ceph_crc32c( + 0, reinterpret_cast(&preamble), + sizeof(preamble) - sizeof(preamble.crc)); +} + +uint64_t FrameAssembler::get_frame_logical_len() const { + ceph_assert(!m_descs.empty()); + uint64_t logical_len = 0; + for (size_t i = 0; i < m_descs.size(); i++) { + logical_len += m_descs[i].logical_len; + } + return logical_len; +} + +uint64_t FrameAssembler::get_frame_onwire_len() const { + ceph_assert(!m_descs.empty()); + uint64_t onwire_len = get_preamble_onwire_len(); + for (size_t i = 0; i < m_descs.size(); i++) { + onwire_len += get_segment_onwire_len(i); + } + onwire_len += get_epilogue_onwire_len(); + return onwire_len; +} + +bufferlist FrameAssembler::asm_crc_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const { + epilogue_plain_block_t epilogue; + // FIPS zeroization audit 20191115: this memset is not security related. + ::memset(&epilogue, 0, sizeof(epilogue)); + + bufferlist frame_bl(sizeof(preamble) + sizeof(epilogue)); + frame_bl.append(reinterpret_cast(&preamble), sizeof(preamble)); + for (size_t i = 0; i < m_descs.size(); i++) { + ceph_assert(segment_bls[i].length() == m_descs[i].logical_len); + epilogue.crc_values[i] = segment_bls[i].crc32c(-1); + if (segment_bls[i].length() > 0) { + frame_bl.claim_append(segment_bls[i]); + } + } + frame_bl.append(reinterpret_cast(&epilogue), sizeof(epilogue)); + return frame_bl; +} + +bufferlist FrameAssembler::asm_secure_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const { + bufferlist preamble_bl(sizeof(preamble)); + preamble_bl.append(reinterpret_cast(&preamble), + sizeof(preamble)); + + epilogue_secure_block_t epilogue; + // FIPS zeroization audit 20191115: this memset is not security related. + ::memset(&epilogue, 0, sizeof(epilogue)); + bufferlist epilogue_bl(sizeof(epilogue)); + epilogue_bl.append(reinterpret_cast(&epilogue), + sizeof(epilogue)); + + // preamble + MAX_NUM_SEGMENTS + epilogue + uint32_t onwire_lens[MAX_NUM_SEGMENTS + 2]; + onwire_lens[0] = preamble_bl.length(); + for (size_t i = 0; i < m_descs.size(); i++) { + onwire_lens[i + 1] = segment_bls[i].length(); // already padded + } + onwire_lens[m_descs.size() + 1] = epilogue_bl.length(); + m_crypto->tx->reset_tx_handler(onwire_lens, + onwire_lens + m_descs.size() + 2); + m_crypto->tx->authenticated_encrypt_update(preamble_bl); + for (size_t i = 0; i < m_descs.size(); i++) { + if (segment_bls[i].length() > 0) { + m_crypto->tx->authenticated_encrypt_update(segment_bls[i]); + } + } + m_crypto->tx->authenticated_encrypt_update(epilogue_bl); + return m_crypto->tx->authenticated_encrypt_final(); +} + +bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[], + const uint16_t segment_aligns[], + size_t segment_count) { + m_descs.resize(calc_num_segments(segment_bls, segment_count)); + for (size_t i = 0; i < m_descs.size(); i++) { + m_descs[i].logical_len = segment_bls[i].length(); + m_descs[i].align = segment_aligns[i]; + } + + preamble_block_t preamble; + fill_preamble(tag, preamble); + + if (m_crypto->rx) { + for (size_t i = 0; i < m_descs.size(); i++) { + ceph_assert(segment_bls[i].length() == m_descs[i].logical_len); + // We're padding segments to biggest cipher's block size. Although + // AES-GCM can live without that as it's a stream cipher, we don't + // want to be fixed to stream ciphers only. + uint32_t padded_len = get_segment_padded_len(i); + if (padded_len > segment_bls[i].length()) { + uint32_t pad_len = padded_len - segment_bls[i].length(); + segment_bls[i].reserve(pad_len); + segment_bls[i].append_zero(pad_len); + } + } + return asm_secure_rev0(preamble, segment_bls); + } + return asm_crc_rev0(preamble, segment_bls); +} + +std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) { + if (!frame_asm.m_descs.empty()) { + os << frame_asm.get_preamble_onwire_len(); + for (size_t i = 0; i < frame_asm.m_descs.size(); i++) { + os << " + " << frame_asm.get_segment_onwire_len(i) + << " (logical " << frame_asm.m_descs[i].logical_len + << "/" << frame_asm.m_descs[i].align << ")"; + } + os << " + " << frame_asm.get_epilogue_onwire_len() << " "; + } + os << "rx=" << frame_asm.m_crypto->rx.get() + << " tx=" << frame_asm.m_crypto->tx.get(); + return os; +} + +} // namespace ceph::msgr::v2 diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index dd544a2a0a14e..d509e46366b11 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -5,6 +5,7 @@ #include "common/Clock.h" #include "crypto_onwire.h" #include +#include #include #include @@ -157,156 +158,106 @@ static uint32_t segment_onwire_size(const uint32_t logical_size) return p2roundup(logical_size, CRYPTO_BLOCK_SIZE); } -static ceph::bufferlist segment_onwire_bufferlist(ceph::bufferlist&& bl) -{ - const auto padding_size = segment_onwire_size(bl.length()) - bl.length(); - if (padding_size) { - bl.append_zero(padding_size); - } - return bl; -} +struct FrameError : std::runtime_error { + using runtime_error::runtime_error; +}; -template -struct Frame { - static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs); - static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS); -protected: - std::array segments; +class FrameAssembler { +public: + // crypto must be non-null + FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto) + : m_crypto(crypto) {} -private: - static constexpr std::array alignments { - SegmentAlignmentVs... - }; - ceph::bufferlist::contiguous_filler preamble_filler; - - __u8 calc_num_segments( - const std::array& segments) - { - for (__u8 num = SegmentsNumV; num > 0; num--) { - if (segments[num-1].length) { - return num; - } - } - // frame always has at least one segment. - return 1; + size_t get_num_segments() const { + ceph_assert(!m_descs.empty()); + return m_descs.size(); } - // craft the main preamble. It's always present regardless of the number - // of segments message is composed from. - void fill_preamble() { - ceph_assert(std::size(segments) <= MAX_NUM_SEGMENTS); - - preamble_block_t main_preamble; - // FIPS zeroization audit 20191115: this memset is not security related. - ::memset(&main_preamble, 0, sizeof(main_preamble)); - - main_preamble.tag = static_cast<__u8>(T::tag); - ceph_assert(main_preamble.tag != 0); - - // implementation detail: the first bufferlist of Frame::segments carries - // space for preamble. This glueing isn't a part of the onwire format but - // just our private detail. - main_preamble.segments.front().length = - segments.front().length() - FRAME_PREAMBLE_SIZE; - main_preamble.segments.front().alignment = alignments.front(); - - // there is no business in issuing frame without at least one segment - // filled. - if constexpr(SegmentsNumV > 1) { - for (__u8 idx = 1; idx < SegmentsNumV; idx++) { - main_preamble.segments[idx].length = segments[idx].length(); - main_preamble.segments[idx].alignment = alignments[idx]; - } - } - // calculate the number of non-empty segments. - // TODO: reorder segments to get DATA first - main_preamble.num_segments = calc_num_segments(main_preamble.segments); + uint32_t get_segment_logical_len(size_t seg_idx) const { + ceph_assert(seg_idx < m_descs.size()); + return m_descs[seg_idx].logical_len; + } - main_preamble.crc = - ceph_crc32c(0, reinterpret_cast(&main_preamble), - sizeof(main_preamble) - sizeof(main_preamble.crc)); + uint16_t get_segment_align(size_t seg_idx) const { + ceph_assert(seg_idx < m_descs.size()); + return m_descs[seg_idx].align; + } - preamble_filler.copy_in(sizeof(main_preamble), - reinterpret_cast(&main_preamble)); + uint32_t get_preamble_onwire_len() const { + return sizeof(preamble_block_t); } - template - void reset_tx_handler( - ceph::crypto::onwire::rxtx_t &session_stream_handlers, - std::index_sequence) - { - session_stream_handlers.tx->reset_tx_handler({ segments[Is].length()... }); + uint32_t get_segment_onwire_len(size_t seg_idx) const { + ceph_assert(seg_idx < m_descs.size()); + if (m_crypto->rx) { + return get_segment_padded_len(seg_idx); + } + return m_descs[seg_idx].logical_len; } -public: - ceph::bufferlist get_buffer( - ceph::crypto::onwire::rxtx_t &session_stream_handlers) - { - fill_preamble(); - if (session_stream_handlers.tx) { - // we're padding segments to biggest cipher's block size. Although - // AES-GCM can live without that as it's a stream cipher, we don't - // to be fixed to stream ciphers only. - for (auto& segment : segments) { - segment = segment_onwire_bufferlist(std::move(segment)); - } + uint32_t get_epilogue_onwire_len() const { + ceph_assert(!m_descs.empty()); + if (m_crypto->rx) { + return sizeof(epilogue_secure_block_t) + get_auth_tag_len(); + } + return sizeof(epilogue_plain_block_t); + } - // let's cipher allocate one huge buffer for entire ciphertext. - reset_tx_handler( - session_stream_handlers, std::make_index_sequence()); + uint64_t get_frame_logical_len() const; + uint64_t get_frame_onwire_len() const; - for (auto& segment : segments) { - if (segment.length()) { - session_stream_handlers.tx->authenticated_encrypt_update( - std::move(segment)); - } - } + bufferlist assemble_frame(Tag tag, bufferlist segment_bls[], + const uint16_t segment_aligns[], + size_t segment_count); - // in secure mode we craft only the late_flags. Signature (for AES-GCM - // called auth tag) will be added by the cipher. - { - epilogue_secure_block_t epilogue; - // FIPS zeroization audit 20191115: this memset is not security - // related. - ::memset(&epilogue, 0, sizeof(epilogue)); - ceph::bufferlist epilogue_bl; - epilogue_bl.append(reinterpret_cast(&epilogue), - sizeof(epilogue)); - session_stream_handlers.tx->authenticated_encrypt_update(epilogue_bl); - } - return session_stream_handlers.tx->authenticated_encrypt_final(); - } else { - // plain mode - epilogue_plain_block_t epilogue; - // FIPS zeroization audit 20191115: this memset is not security related. - ::memset(&epilogue, 0, sizeof(epilogue)); - - ceph::bufferlist::const_iterator hdriter(&segments.front(), - FRAME_PREAMBLE_SIZE); - epilogue.crc_values[SegmentIndex::Control::PAYLOAD] = - hdriter.crc32c(hdriter.get_remaining(), -1); - if constexpr(SegmentsNumV > 1) { - for (__u8 idx = 1; idx < SegmentsNumV; idx++) { - epilogue.crc_values[idx] = segments[idx].crc32c(-1); - } - } +private: + struct segment_desc_t { + uint32_t logical_len; + uint16_t align; + }; - ceph::bufferlist ret; - for (auto& segment : segments) { - ret.claim_append(segment); - } - ret.append(reinterpret_cast(&epilogue), sizeof(epilogue)); - return ret; - } + uint32_t get_segment_padded_len(size_t seg_idx) const { + return p2roundup(m_descs[seg_idx].logical_len, + CRYPTO_BLOCK_SIZE); } - Frame() - : preamble_filler(segments.front().append_hole(FRAME_PREAMBLE_SIZE)) { + uint32_t get_auth_tag_len() const { + return m_crypto->rx->get_extra_size_at_final(); } -public: + bufferlist asm_crc_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; + bufferlist asm_secure_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; + + void fill_preamble(Tag tag, preamble_block_t& preamble) const; + friend std::ostream& operator<<(std::ostream& os, + const FrameAssembler& frame_asm); + + boost::container::static_vector m_descs; + const ceph::crypto::onwire::rxtx_t* m_crypto; }; +template +struct Frame { + static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs); + static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS); +protected: + std::array segments; + +private: + static constexpr std::array alignments { + SegmentAlignmentVs... + }; + +public: + ceph::bufferlist get_buffer(FrameAssembler& tx_frame_asm) { + auto bl = tx_frame_asm.assemble_frame(T::tag, segments.data(), + alignments.data(), SegmentsNumV); + ceph_assert(bl.length() == tx_frame_asm.get_frame_onwire_len()); + return bl; + } +}; // ControlFrames are used to manage transceiver state (like connections) and // orchestrate transfers of MessageFrames. They use only single segment with -- 2.39.5