From: Ilya Dryomov Date: Fri, 24 Apr 2020 16:01:46 +0000 (+0200) Subject: msg/async/frames_v2: introduce FrameAssembler X-Git-Tag: v17.0.0~2033^2~11 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=872b125a5b817def84ed5d70d204fec5a4fa7c1e;p=ceph.git msg/async/frames_v2: introduce FrameAssembler Start separating frame assembly and disassembly code from frame sending, receiving and handling code, so that assembly and disassembly pieces can be unit tested and hopefully also shared between different messengers (e.g. crimson). This commit factors out the assembly code from Frame. Signed-off-by: Ilya Dryomov --- diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 61a45391cd359..424378ba62cf8 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -157,6 +157,7 @@ set(crimson_mon_srcs mon/MonClient.cc ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc) set(crimson_net_srcs + ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc net/Errors.cc net/Messenger.cc net/SocketConnection.cc diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 3221d648b360b..3e48417e9cc52 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -148,7 +148,8 @@ ProtocolV2::ProtocolV2(Dispatcher& dispatcher, SocketMessenger& messenger) : Protocol(proto_t::v2, dispatcher, conn), messenger{messenger}, - protocol_timer{conn} + protocol_timer{conn}, + tx_frame_asm(&session_stream_handlers) {} ProtocolV2::~ProtocolV2() {} @@ -386,7 +387,7 @@ seastar::future<> ProtocolV2::read_frame_payload() template seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) { - auto bl = frame.get_buffer(session_stream_handlers); + auto bl = frame.get_buffer(tx_frame_asm); const auto main_preamble = reinterpret_cast(bl.front().c_str()); logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}", conn, bl.length(), (int)main_preamble->tag, @@ -1850,19 +1851,19 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( if (unlikely(require_keepalive)) { auto keepalive_frame = KeepAliveFrame::Encode(); - bl.append(keepalive_frame.get_buffer(session_stream_handlers)); + bl.append(keepalive_frame.get_buffer(tx_frame_asm)); INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE); } if (unlikely(_keepalive_ack.has_value())) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack); - bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); + bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm)); INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE); } if (require_ack && !num_msgs) { auto ack_frame = AckFrame::Encode(conn.in_seq); - bl.append(ack_frame.get_buffer(session_stream_handlers)); + bl.append(ack_frame.get_buffer(tx_frame_asm)); INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE); } @@ -1891,7 +1892,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( msg->get_payload(), msg->get_middle(), msg->get_data()); logger().debug("{} --> #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); - bl.append(message.get_buffer(session_stream_handlers)); + bl.append(message.get_buffer(tx_frame_asm)); INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE); }); diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index bd9f1a425bc86..e007c9b40994f 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -123,6 +123,7 @@ class ProtocolV2 final : public Protocol { ceph::crypto::onwire::rxtx_t session_stream_handlers; boost::container::static_vector rx_segments_desc; + ceph::msgr::v2::FrameAssembler tx_frame_asm; ceph::msgr::v2::segment_bls_t rx_segments_data; size_t get_current_msg_size() const; diff --git a/src/msg/CMakeLists.txt b/src/msg/CMakeLists.txt index 0c83596579584..fada39b457f4f 100644 --- a/src/msg/CMakeLists.txt +++ b/src/msg/CMakeLists.txt @@ -17,6 +17,7 @@ list(APPEND msg_srcs async/PosixStack.cc async/Stack.cc async/crypto_onwire.cc + async/frames_v2.cc async/net_handler.cc) if(LINUX) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index cdb8b7205571a..9b9d6f122eea8 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -92,6 +92,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) replacing(false), can_write(false), bannerExchangeCallback(nullptr), + tx_frame_asm(&session_stream_handlers), next_tag(static_cast(0)), keepalive(false) { } @@ -573,11 +574,14 @@ template bool ProtocolV2::append_frame(F& frame) { ceph::bufferlist bl; try { - bl = frame.get_buffer(session_stream_handlers); + bl = frame.get_buffer(tx_frame_asm); } catch (ceph::crypto::onwire::TxHandlerError &e) { ldout(cct, 1) << __func__ << " " << e.what() << dendl; return false; } + + ldout(cct, 25) << __func__ << " assembled frame " << bl.length() + << " bytes " << tx_frame_asm << dendl; connection->outgoing_bl.append(bl); return true; } @@ -787,11 +791,14 @@ CtPtr ProtocolV2::write(const std::string &desc, F &frame) { ceph::bufferlist bl; try { - bl = frame.get_buffer(session_stream_handlers); + bl = frame.get_buffer(tx_frame_asm); } catch (ceph::crypto::onwire::TxHandlerError &e) { ldout(cct, 1) << __func__ << " " << e.what() << dendl; return _fault(); } + + ldout(cct, 25) << __func__ << " assembled frame " << bl.length() + << " bytes " << tx_frame_asm << dendl; return write(desc, next, bl); } diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 29ad191cbb1a2..78224772604ee 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -98,6 +98,7 @@ private: boost::container::static_vector rx_segments_desc; + ceph::msgr::v2::FrameAssembler tx_frame_asm; ceph::msgr::v2::segment_bls_t rx_segments_data; ceph::msgr::v2::Tag next_tag; utime_t backoff; // backoff time diff --git a/src/msg/async/frames_v2.cc b/src/msg/async/frames_v2.cc new file mode 100644 index 0000000000000..ead90ae9081a1 --- /dev/null +++ b/src/msg/async/frames_v2.cc @@ -0,0 +1,167 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "frames_v2.h" + +#include + +#include + +namespace ceph::msgr::v2 { + +// Discards trailing empty segments, unless there is just one segment. +// A frame always has at least one (possibly empty) segment. +static size_t calc_num_segments(const bufferlist segment_bls[], + size_t segment_count) { + ceph_assert(segment_count > 0 && segment_count <= MAX_NUM_SEGMENTS); + for (size_t i = segment_count; i-- > 0; ) { + if (segment_bls[i].length() > 0) { + return i + 1; + } + } + return 1; +} + +void FrameAssembler::fill_preamble(Tag tag, + preamble_block_t& preamble) const { + // FIPS zeroization audit 20191115: this memset is not security related. + ::memset(&preamble, 0, sizeof(preamble)); + + preamble.tag = static_cast<__u8>(tag); + for (size_t i = 0; i < m_descs.size(); i++) { + preamble.segments[i].length = m_descs[i].logical_len; + preamble.segments[i].alignment = m_descs[i].align; + } + preamble.num_segments = m_descs.size(); + preamble.crc = ceph_crc32c( + 0, reinterpret_cast(&preamble), + sizeof(preamble) - sizeof(preamble.crc)); +} + +uint64_t FrameAssembler::get_frame_logical_len() const { + ceph_assert(!m_descs.empty()); + uint64_t logical_len = 0; + for (size_t i = 0; i < m_descs.size(); i++) { + logical_len += m_descs[i].logical_len; + } + return logical_len; +} + +uint64_t FrameAssembler::get_frame_onwire_len() const { + ceph_assert(!m_descs.empty()); + uint64_t onwire_len = get_preamble_onwire_len(); + for (size_t i = 0; i < m_descs.size(); i++) { + onwire_len += get_segment_onwire_len(i); + } + onwire_len += get_epilogue_onwire_len(); + return onwire_len; +} + +bufferlist FrameAssembler::asm_crc_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const { + epilogue_plain_block_t epilogue; + // FIPS zeroization audit 20191115: this memset is not security related. + ::memset(&epilogue, 0, sizeof(epilogue)); + + bufferlist frame_bl(sizeof(preamble) + sizeof(epilogue)); + frame_bl.append(reinterpret_cast(&preamble), sizeof(preamble)); + for (size_t i = 0; i < m_descs.size(); i++) { + ceph_assert(segment_bls[i].length() == m_descs[i].logical_len); + epilogue.crc_values[i] = segment_bls[i].crc32c(-1); + if (segment_bls[i].length() > 0) { + frame_bl.claim_append(segment_bls[i]); + } + } + frame_bl.append(reinterpret_cast(&epilogue), sizeof(epilogue)); + return frame_bl; +} + +bufferlist FrameAssembler::asm_secure_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const { + bufferlist preamble_bl(sizeof(preamble)); + preamble_bl.append(reinterpret_cast(&preamble), + sizeof(preamble)); + + epilogue_secure_block_t epilogue; + // FIPS zeroization audit 20191115: this memset is not security related. + ::memset(&epilogue, 0, sizeof(epilogue)); + bufferlist epilogue_bl(sizeof(epilogue)); + epilogue_bl.append(reinterpret_cast(&epilogue), + sizeof(epilogue)); + + // preamble + MAX_NUM_SEGMENTS + epilogue + uint32_t onwire_lens[MAX_NUM_SEGMENTS + 2]; + onwire_lens[0] = preamble_bl.length(); + for (size_t i = 0; i < m_descs.size(); i++) { + onwire_lens[i + 1] = segment_bls[i].length(); // already padded + } + onwire_lens[m_descs.size() + 1] = epilogue_bl.length(); + m_crypto->tx->reset_tx_handler(onwire_lens, + onwire_lens + m_descs.size() + 2); + m_crypto->tx->authenticated_encrypt_update(preamble_bl); + for (size_t i = 0; i < m_descs.size(); i++) { + if (segment_bls[i].length() > 0) { + m_crypto->tx->authenticated_encrypt_update(segment_bls[i]); + } + } + m_crypto->tx->authenticated_encrypt_update(epilogue_bl); + return m_crypto->tx->authenticated_encrypt_final(); +} + +bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[], + const uint16_t segment_aligns[], + size_t segment_count) { + m_descs.resize(calc_num_segments(segment_bls, segment_count)); + for (size_t i = 0; i < m_descs.size(); i++) { + m_descs[i].logical_len = segment_bls[i].length(); + m_descs[i].align = segment_aligns[i]; + } + + preamble_block_t preamble; + fill_preamble(tag, preamble); + + if (m_crypto->rx) { + for (size_t i = 0; i < m_descs.size(); i++) { + ceph_assert(segment_bls[i].length() == m_descs[i].logical_len); + // We're padding segments to biggest cipher's block size. Although + // AES-GCM can live without that as it's a stream cipher, we don't + // want to be fixed to stream ciphers only. + uint32_t padded_len = get_segment_padded_len(i); + if (padded_len > segment_bls[i].length()) { + uint32_t pad_len = padded_len - segment_bls[i].length(); + segment_bls[i].reserve(pad_len); + segment_bls[i].append_zero(pad_len); + } + } + return asm_secure_rev0(preamble, segment_bls); + } + return asm_crc_rev0(preamble, segment_bls); +} + +std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) { + if (!frame_asm.m_descs.empty()) { + os << frame_asm.get_preamble_onwire_len(); + for (size_t i = 0; i < frame_asm.m_descs.size(); i++) { + os << " + " << frame_asm.get_segment_onwire_len(i) + << " (logical " << frame_asm.m_descs[i].logical_len + << "/" << frame_asm.m_descs[i].align << ")"; + } + os << " + " << frame_asm.get_epilogue_onwire_len() << " "; + } + os << "rx=" << frame_asm.m_crypto->rx.get() + << " tx=" << frame_asm.m_crypto->tx.get(); + return os; +} + +} // namespace ceph::msgr::v2 diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index 0576ce9cf92e0..035549b8a064a 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -5,6 +5,7 @@ #include "common/Clock.h" #include "crypto_onwire.h" #include +#include #include #include @@ -157,157 +158,107 @@ static uint32_t segment_onwire_size(const uint32_t logical_size) return p2roundup(logical_size, CRYPTO_BLOCK_SIZE); } -static inline ceph::bufferlist segment_onwire_bufferlist(ceph::bufferlist&& bl) -{ - const auto padding_size = segment_onwire_size(bl.length()) - bl.length(); - if (padding_size) { - bl.append_zero(padding_size); +struct FrameError : std::runtime_error { + using runtime_error::runtime_error; +}; + +class FrameAssembler { +public: + // crypto must be non-null + FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto) + : m_crypto(crypto) {} + + size_t get_num_segments() const { + ceph_assert(!m_descs.empty()); + return m_descs.size(); } - return std::move(bl); -} -template -struct Frame { - static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs); - static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS); -protected: - std::array segments; + uint32_t get_segment_logical_len(size_t seg_idx) const { + ceph_assert(seg_idx < m_descs.size()); + return m_descs[seg_idx].logical_len; + } -private: - static constexpr std::array alignments { - SegmentAlignmentVs... - }; - ceph::bufferlist::contiguous_filler preamble_filler; + uint16_t get_segment_align(size_t seg_idx) const { + ceph_assert(seg_idx < m_descs.size()); + return m_descs[seg_idx].align; + } - __u8 calc_num_segments(const segment_t segments[]) - { - for (__u8 num = SegmentsNumV; num > 0; num--) { - if (segments[num-1].length) { - return num; - } + uint32_t get_preamble_onwire_len() const { + return sizeof(preamble_block_t); + } + + uint32_t get_segment_onwire_len(size_t seg_idx) const { + ceph_assert(seg_idx < m_descs.size()); + if (m_crypto->rx) { + return get_segment_padded_len(seg_idx); } - // frame always has at least one segment. - return 1; + return m_descs[seg_idx].logical_len; } - // craft the main preamble. It's always present regardless of the number - // of segments message is composed from. - void fill_preamble() { - ceph_assert(std::size(segments) <= MAX_NUM_SEGMENTS); - - preamble_block_t main_preamble; - // FIPS zeroization audit 20191115: this memset is not security related. - ::memset(&main_preamble, 0, sizeof(main_preamble)); - - main_preamble.tag = static_cast<__u8>(T::tag); - ceph_assert(main_preamble.tag != 0); - - // implementation detail: the first bufferlist of Frame::segments carries - // space for preamble. This glueing isn't a part of the onwire format but - // just our private detail. - main_preamble.segments[0].length = - segments[0].length() - FRAME_PREAMBLE_SIZE; - main_preamble.segments[0].alignment = alignments[0]; - - // there is no business in issuing frame without at least one segment - // filled. - if constexpr(SegmentsNumV > 1) { - for (__u8 idx = 1; idx < SegmentsNumV; idx++) { - main_preamble.segments[idx].length = segments[idx].length(); - main_preamble.segments[idx].alignment = alignments[idx]; - } + uint32_t get_epilogue_onwire_len() const { + ceph_assert(!m_descs.empty()); + if (m_crypto->rx) { + return sizeof(epilogue_secure_block_t) + get_auth_tag_len(); } - // calculate the number of non-empty segments. - // TODO: reorder segments to get DATA first - main_preamble.num_segments = calc_num_segments(main_preamble.segments); + return sizeof(epilogue_plain_block_t); + } - main_preamble.crc = - ceph_crc32c(0, reinterpret_cast(&main_preamble), - sizeof(main_preamble) - sizeof(main_preamble.crc)); + uint64_t get_frame_logical_len() const; + uint64_t get_frame_onwire_len() const; - preamble_filler.copy_in(sizeof(main_preamble), - reinterpret_cast(&main_preamble)); - } + bufferlist assemble_frame(Tag tag, bufferlist segment_bls[], + const uint16_t segment_aligns[], + size_t segment_count); - template - void reset_tx_handler( - ceph::crypto::onwire::rxtx_t &session_stream_handlers, - std::index_sequence) - { - session_stream_handlers.tx->reset_tx_handler({ segments[Is].length()..., - sizeof(epilogue_secure_block_t) }); +private: + struct segment_desc_t { + uint32_t logical_len; + uint16_t align; + }; + + uint32_t get_segment_padded_len(size_t seg_idx) const { + return p2roundup(m_descs[seg_idx].logical_len, + CRYPTO_BLOCK_SIZE); } -public: - ceph::bufferlist get_buffer( - ceph::crypto::onwire::rxtx_t &session_stream_handlers) - { - fill_preamble(); - if (session_stream_handlers.tx) { - // we're padding segments to biggest cipher's block size. Although - // AES-GCM can live without that as it's a stream cipher, we don't - // to be fixed to stream ciphers only. - for (auto& segment : segments) { - segment = segment_onwire_bufferlist(std::move(segment)); - } + uint32_t get_auth_tag_len() const { + return m_crypto->rx->get_extra_size_at_final(); + } - // let's cipher allocate one huge buffer for entire ciphertext. - reset_tx_handler( - session_stream_handlers, std::make_index_sequence()); + bufferlist asm_crc_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; + bufferlist asm_secure_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; - for (auto& segment : segments) { - if (segment.length()) { - session_stream_handlers.tx->authenticated_encrypt_update( - std::move(segment)); - } - } + void fill_preamble(Tag tag, preamble_block_t& preamble) const; + friend std::ostream& operator<<(std::ostream& os, + const FrameAssembler& frame_asm); - // in secure mode we craft only the late_flags. Signature (for AES-GCM - // called auth tag) will be added by the cipher. - { - epilogue_secure_block_t epilogue; - // FIPS zeroization audit 20191115: this memset is not security - // related. - ::memset(&epilogue, 0, sizeof(epilogue)); - ceph::bufferlist epilogue_bl; - epilogue_bl.append(reinterpret_cast(&epilogue), - sizeof(epilogue)); - session_stream_handlers.tx->authenticated_encrypt_update(epilogue_bl); - } - return session_stream_handlers.tx->authenticated_encrypt_final(); - } else { - // plain mode - epilogue_plain_block_t epilogue; - // FIPS zeroization audit 20191115: this memset is not security related. - ::memset(&epilogue, 0, sizeof(epilogue)); - - ceph::bufferlist::const_iterator hdriter(&segments.front(), - FRAME_PREAMBLE_SIZE); - epilogue.crc_values[SegmentIndex::Control::PAYLOAD] = - hdriter.crc32c(hdriter.get_remaining(), -1); - if constexpr(SegmentsNumV > 1) { - for (__u8 idx = 1; idx < SegmentsNumV; idx++) { - epilogue.crc_values[idx] = segments[idx].crc32c(-1); - } - } + boost::container::static_vector m_descs; + const ceph::crypto::onwire::rxtx_t* m_crypto; +}; - ceph::bufferlist ret; - for (auto& segment : segments) { - ret.claim_append(segment); - } - ret.append(reinterpret_cast(&epilogue), sizeof(epilogue)); - return ret; - } - } +template +struct Frame { + static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs); + static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS); +protected: + std::array segments; - Frame() - : preamble_filler(segments.front().append_hole(FRAME_PREAMBLE_SIZE)) { - } +private: + static constexpr std::array alignments { + SegmentAlignmentVs... + }; public: + ceph::bufferlist get_buffer(FrameAssembler& tx_frame_asm) { + auto bl = tx_frame_asm.assemble_frame(T::tag, segments.data(), + alignments.data(), SegmentsNumV); + ceph_assert(bl.length() == tx_frame_asm.get_frame_onwire_len()); + return bl; + } }; - // ControlFrames are used to manage transceiver state (like connections) and // orchestrate transfers of MessageFrames. They use only single segment with // marshalling facilities -- derived classes specify frame structure through