From: Ilya Dryomov Date: Sat, 25 Apr 2020 09:44:47 +0000 (+0200) Subject: msg/async/ProtocolV2: switch to FrameAssembler X-Git-Tag: v15.2.5~164^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=59144b2705f58dba6f117b11ebb948b9e310d86a;p=ceph.git msg/async/ProtocolV2: switch to FrameAssembler Factor out the disassembly code from ProtocolV2 and switch ProtocolV2 to FrameAssembler. Signed-off-by: Ilya Dryomov (cherry picked from commit b9e0cfe1cce13b9e977bcea192da7c7843a3023d) Conflicts: src/msg/async/ProtocolV2.cc [ context: commit d3ec4c01d17 ("msg: Build target 'common' without using namespace in headers") not in octopus ] --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index a9e30b9354c8..ed5f590a4d02 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -93,6 +93,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) can_write(false), bannerExchangeCallback(nullptr), tx_frame_asm(&session_stream_handlers), + rx_frame_asm(&session_stream_handlers), next_tag(static_cast(0)), keepalive(false) { } @@ -264,11 +265,11 @@ void ProtocolV2::reset_recv_state() { } size_t ProtocolV2::get_current_msg_size() const { - ceph_assert(!rx_segments_desc.empty()); + ceph_assert(rx_frame_asm.get_num_segments() > 0); size_t sum = 0; // we don't include SegmentIndex::Msg::HEADER. - for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) { - sum += rx_segments_desc[idx].length; + for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) { + sum += rx_frame_asm.get_segment_logical_len(i); } return sum; } @@ -736,26 +737,6 @@ bool ProtocolV2::is_queued() { return !out_queue.empty() || connection->is_queued(); } -uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const { - if (session_stream_handlers.rx) { - return segment_onwire_size(logical_size); - } else { - return logical_size; - } -} - -uint32_t ProtocolV2::get_epilogue_size() const { - // In secure mode size of epilogue is flexible and depends on particular - // cipher implementation. See the comment for epilogue_secure_block_t or - // epilogue_plain_block_t. - if (session_stream_handlers.rx) { - return FRAME_SECURE_EPILOGUE_SIZE + \ - session_stream_handlers.rx->get_extra_size_at_final(); - } else { - return FRAME_PLAIN_EPILOGUE_SIZE; - } -} - CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE &next, rx_buffer_t &&buffer) { const auto len = buffer->length(); @@ -1068,7 +1049,12 @@ CtPtr ProtocolV2::read_frame() { } ldout(cct, 20) << __func__ << dendl; - return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main); + rx_preamble.clear(); + rx_epilogue.clear(); + rx_segments_data.clear(); + + return READ(rx_frame_asm.get_preamble_onwire_len(), + handle_read_frame_preamble_main); } CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) { @@ -1080,73 +1066,31 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) { return _fault(); } - ceph::bufferlist preamble; - preamble.push_back(std::move(buffer)); + rx_preamble.push_back(std::move(buffer)); ldout(cct, 30) << __func__ << " preamble\n"; - preamble.hexdump(*_dout); + rx_preamble.hexdump(*_dout); *_dout << dendl; - if (session_stream_handlers.rx) { - ceph_assert(session_stream_handlers.rx); - - session_stream_handlers.rx->reset_rx_handler(); - session_stream_handlers.rx->authenticated_decrypt_update(preamble); + try { + next_tag = rx_frame_asm.disassemble_preamble(rx_preamble); + } catch (FrameError& e) { + ldout(cct, 1) << __func__ << " " << e.what() << dendl; + return _fault(); + } catch (ceph::crypto::onwire::MsgAuthError&) { + ldout(cct, 1) << __func__ << "bad auth tag" << dendl; + return _fault(); + } - ldout(cct, 10) << __func__ << " got encrypted preamble." - << " after decrypt premable.length()=" << preamble.length() - << dendl; + ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm + << dendl; + if (session_stream_handlers.rx) { ldout(cct, 30) << __func__ << " preamble after decrypt\n"; - preamble.hexdump(*_dout); + rx_preamble.hexdump(*_dout); *_dout << dendl; } - { - // I expect ceph_le32 will make the endian conversion for me. Passing - // everything through ::Decode is unnecessary. - const auto& main_preamble = \ - reinterpret_cast(*preamble.c_str()); - - // verify preamble's CRC before any further processing - const auto rx_crc = ceph_crc32c(0, - reinterpret_cast(&main_preamble), - sizeof(main_preamble) - sizeof(main_preamble.crc)); - if (rx_crc != main_preamble.crc) { - ldout(cct, 10) << __func__ << " crc mismatch for main preamble" - << " rx_crc=" << rx_crc - << " tx_crc=" << main_preamble.crc << dendl; - return _fault(); - } - - // currently we do support between 1 and MAX_NUM_SEGMENTS segments - if (main_preamble.num_segments < 1 || - main_preamble.num_segments > MAX_NUM_SEGMENTS) { - ldout(cct, 10) << __func__ << " unsupported num_segments=" - << " tx_crc=" << main_preamble.num_segments << dendl; - return _fault(); - } - - next_tag = static_cast(main_preamble.tag); - - rx_segments_desc.clear(); - rx_segments_data.clear(); - - if (main_preamble.num_segments > MAX_NUM_SEGMENTS) { - ldout(cct, 30) << __func__ - << " num_segments=" << main_preamble.num_segments - << " is too much" << dendl; - return _fault(); - } - for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) { - ldout(cct, 10) << __func__ << " got new segment:" - << " len=" << main_preamble.segments[idx].length - << " align=" << main_preamble.segments[idx].alignment - << dendl; - rx_segments_desc.emplace_back(main_preamble.segments[idx]); - } - } - // does it need throttle? if (next_tag == Tag::MESSAGE) { if (state != READY) { @@ -1199,21 +1143,23 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() { } CtPtr ProtocolV2::read_frame_segment() { - ldout(cct, 20) << __func__ << dendl; - ceph_assert(!rx_segments_desc.empty()); + size_t seg_idx = rx_segments_data.size(); + ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl; + rx_segments_data.emplace_back(); + + uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); + uint16_t align = rx_frame_asm.get_segment_align(seg_idx); - // description of current segment to read - const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size()); rx_buffer_t rx_buffer; try { rx_buffer = buffer::ptr_node::create(buffer::create_aligned( - get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment)); + onwire_len, align)); } catch (std::bad_alloc&) { // Catching because of potential issues with satisfying alignment. ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer" - << " len=" << get_onwire_size(cur_rx_desc.length) - << " align=" << cur_rx_desc.alignment - << dendl; + << " len=" << onwire_len + << " align=" << align + << dendl; return _fault(); } @@ -1229,36 +1175,15 @@ CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) { return _fault(); } - rx_segments_data.emplace_back(); rx_segments_data.back().push_back(std::move(rx_buffer)); - // decrypt incoming data - // FIXME: if (auth_meta->is_mode_secure()) { - if (session_stream_handlers.rx) { - ceph_assert(session_stream_handlers.rx); - - auto& new_seg = rx_segments_data.back(); - if (new_seg.length()) { - session_stream_handlers.rx->authenticated_decrypt_update(new_seg); - const auto idx = rx_segments_data.size() - 1; - if (new_seg.length() > rx_segments_desc[idx].length) { - new_seg.splice(rx_segments_desc[idx].length, - new_seg.length() - rx_segments_desc[idx].length); - } - - ldout(cct, 20) << __func__ - << " unpadded new_seg.length()=" << new_seg.length() - << dendl; - } - } - - if (rx_segments_desc.size() == rx_segments_data.size()) { + if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) { // OK, all segments planned to read are read. Can go with epilogue. - return READ(get_epilogue_size(), handle_read_frame_epilogue_main); - } else { - // TODO: for makeshift only. This will be more generic and throttled - return read_frame_segment(); + return READ(rx_frame_asm.get_epilogue_onwire_len(), + handle_read_frame_epilogue_main); } + // TODO: for makeshift only. This will be more generic and throttled + return read_frame_segment(); } CtPtr ProtocolV2::handle_frame_payload() { @@ -1358,61 +1283,29 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r) return _fault(); } - __u8 late_flags; + rx_epilogue.push_back(std::move(buffer)); - // FIXME: if (auth_meta->is_mode_secure()) { - if (session_stream_handlers.rx) { - ldout(cct, 1) << __func__ << " read frame epilogue bytes=" - << get_epilogue_size() << dendl; - - // decrypt epilogue and authenticate entire frame. - ceph::bufferlist epilogue_bl; - { - epilogue_bl.push_back(std::move(buffer)); - try { - session_stream_handlers.rx->authenticated_decrypt_update_final( - epilogue_bl); - } catch (ceph::crypto::onwire::MsgAuthError &e) { - ldout(cct, 5) << __func__ << " message authentication failed: " - << e.what() << dendl; - return _fault(); - } - } - auto& epilogue = - reinterpret_cast(*epilogue_bl.c_str()); - late_flags = epilogue.late_flags; - } else { - auto& epilogue = reinterpret_cast(*buffer->c_str()); - - for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) { - const __u32 expected_crc = epilogue.crc_values[idx]; - const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1); - if (expected_crc != calculated_crc) { - ldout(cct, 5) << __func__ << " message integrity check failed: " - << " expected_crc=" << expected_crc - << " calculated_crc=" << calculated_crc - << dendl; - return _fault(); - } else { - ldout(cct, 20) << __func__ << " message integrity check success: " - << " expected_crc=" << expected_crc - << " calculated_crc=" << calculated_crc - << dendl; - } - } - late_flags = epilogue.late_flags; + bool aborted; + try { + aborted = !rx_frame_asm.disassemble_segments(rx_segments_data.data(), + rx_epilogue); + } catch (FrameError& e) { + ldout(cct, 1) << __func__ << " " << e.what() << dendl; + return _fault(); + } catch (ceph::crypto::onwire::MsgAuthError&) { + ldout(cct, 1) << __func__ << "bad auth tag" << dendl; + return _fault(); } // we do have a mechanism that allows transmitter to start sending message // and abort after putting entire data field on wire. This will be used by // the kernel client to avoid unnecessary buffering. - if (late_flags & FRAME_FLAGS_LATEABRT) { + if (aborted) { reset_throttle(); state = READY; return CONTINUE(read_frame); - } else { - return handle_read_frame_dispatch(); } + return handle_read_frame_dispatch(); } CtPtr ProtocolV2::handle_message() { diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 610186b1e091..c4aa6b9c5b60 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -4,8 +4,6 @@ #ifndef _MSG_ASYNC_PROTOCOL_V2_ #define _MSG_ASYNC_PROTOCOL_V2_ -#include - #include "Protocol.h" #include "crypto_onwire.h" #include "frames_v2.h" @@ -96,9 +94,11 @@ private: using ProtFuncPtr = void (ProtocolV2::*)(); Ct *bannerExchangeCallback; - boost::container::static_vector rx_segments_desc; ceph::msgr::v2::FrameAssembler tx_frame_asm; + ceph::msgr::v2::FrameAssembler rx_frame_asm; + + ceph::bufferlist rx_preamble; + ceph::bufferlist rx_epilogue; ceph::msgr::v2::segment_bls_t rx_segments_data; ceph::msgr::v2::Tag next_tag; utime_t backoff; // backoff time @@ -251,8 +251,6 @@ private: Ct *send_reconnect_ok(); Ct *server_ready(); - uint32_t get_onwire_size(uint32_t logical_size) const; - uint32_t get_epilogue_size() const; size_t get_current_msg_size() const; }; diff --git a/src/msg/async/frames_v2.cc b/src/msg/async/frames_v2.cc index ead90ae9081a..bcbebdbd2d22 100644 --- a/src/msg/async/frames_v2.cc +++ b/src/msg/async/frames_v2.cc @@ -20,6 +20,14 @@ namespace ceph::msgr::v2 { +// Unpads bufferlist to unpadded_len. +static void unpad_zero(bufferlist& bl, uint32_t unpadded_len) { + ceph_assert(bl.length() >= unpadded_len); + if (bl.length() > unpadded_len) { + bl.splice(unpadded_len, bl.length() - unpadded_len); + } +} + // Discards trailing empty segments, unless there is just one segment. // A frame always has at least one (possibly empty) segment. static size_t calc_num_segments(const bufferlist segment_bls[], @@ -33,6 +41,15 @@ static size_t calc_num_segments(const bufferlist segment_bls[], return 1; } +static void check_segment_crc(const bufferlist& segment_bl, + uint32_t expected_crc) { + uint32_t crc = segment_bl.crc32c(-1); + if (crc != expected_crc) { + throw FrameError(fmt::format( + "bad segment crc calculated={} expected={}", crc, expected_crc)); + } +} + void FrameAssembler::fill_preamble(Tag tag, preamble_block_t& preamble) const { // FIPS zeroization audit 20191115: this memset is not security related. @@ -149,6 +166,85 @@ bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[], return asm_crc_rev0(preamble, segment_bls); } +Tag FrameAssembler::disassemble_preamble(bufferlist& preamble_bl) { + ceph_assert(preamble_bl.length() == sizeof(preamble_block_t)); + if (m_crypto->rx) { + m_crypto->rx->reset_rx_handler(); + m_crypto->rx->authenticated_decrypt_update(preamble_bl); + } + + // I expect ceph_le32 will make the endian conversion for me. Passing + // everything through ::Decode is unnecessary. + auto preamble = reinterpret_cast( + preamble_bl.c_str()); + // check preamble crc before any further processing + uint32_t crc = ceph_crc32c( + 0, reinterpret_cast(preamble), + sizeof(*preamble) - sizeof(preamble->crc)); + if (crc != preamble->crc) { + throw FrameError(fmt::format( + "bad preamble crc calculated={} expected={}", crc, preamble->crc)); + } + + // see calc_num_segments() + if (preamble->num_segments < 1 || + preamble->num_segments > MAX_NUM_SEGMENTS) { + throw FrameError(fmt::format( + "bad number of segments num_segments={}", preamble->num_segments)); + } + if (preamble->num_segments > 1 && + preamble->segments[preamble->num_segments - 1].length == 0) { + throw FrameError("last segment empty"); + } + + m_descs.resize(preamble->num_segments); + for (size_t i = 0; i < m_descs.size(); i++) { + m_descs[i].logical_len = preamble->segments[i].length; + m_descs[i].align = preamble->segments[i].alignment; + } + return static_cast(preamble->tag); +} + +bool FrameAssembler::disasm_all_crc_rev0(bufferlist segment_bls[], + bufferlist& epilogue_bl) const { + ceph_assert(epilogue_bl.length() == sizeof(epilogue_plain_block_t)); + auto epilogue = reinterpret_cast( + epilogue_bl.c_str()); + + for (size_t i = 0; i < m_descs.size(); i++) { + ceph_assert(segment_bls[i].length() == m_descs[i].logical_len); + check_segment_crc(segment_bls[i], epilogue->crc_values[i]); + } + return !(epilogue->late_flags & FRAME_FLAGS_LATEABRT); +} + +bool FrameAssembler::disasm_all_secure_rev0(bufferlist segment_bls[], + bufferlist& epilogue_bl) const { + for (size_t i = 0; i < m_descs.size(); i++) { + ceph_assert(segment_bls[i].length() == get_segment_padded_len(i)); + if (segment_bls[i].length() > 0) { + m_crypto->rx->authenticated_decrypt_update(segment_bls[i]); + unpad_zero(segment_bls[i], m_descs[i].logical_len); + } + } + + ceph_assert(epilogue_bl.length() == sizeof(epilogue_secure_block_t) + + get_auth_tag_len()); + m_crypto->rx->authenticated_decrypt_update_final(epilogue_bl); + auto epilogue = reinterpret_cast( + epilogue_bl.c_str()); + return !(epilogue->late_flags & FRAME_FLAGS_LATEABRT); +} + +bool FrameAssembler::disassemble_segments(bufferlist segment_bls[], + bufferlist& epilogue_bl) const { + ceph_assert(!m_descs.empty()); + if (m_crypto->rx) { + return disasm_all_secure_rev0(segment_bls, epilogue_bl); + } + return disasm_all_crc_rev0(segment_bls, epilogue_bl); +} + std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) { if (!frame_asm.m_descs.empty()) { os << frame_asm.get_preamble_onwire_len(); diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index cea5f0efd037..f61ed8531e06 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -153,11 +153,6 @@ static constexpr uint32_t FRAME_SECURE_EPILOGUE_SIZE = #define FRAME_FLAGS_LATEABRT (1<<0) /* frame was aborted after txing data */ -static uint32_t segment_onwire_size(const uint32_t logical_size) -{ - return p2roundup(logical_size, CRYPTO_BLOCK_SIZE); -} - struct FrameError : std::runtime_error { using runtime_error::runtime_error; }; @@ -210,6 +205,11 @@ public: const uint16_t segment_aligns[], size_t segment_count); + Tag disassemble_preamble(bufferlist& preamble_bl); + + bool disassemble_segments(bufferlist segment_bls[], + bufferlist& epilogue_bl) const; + private: struct segment_desc_t { uint32_t logical_len; @@ -230,6 +230,11 @@ private: bufferlist asm_secure_rev0(const preamble_block_t& preamble, bufferlist segment_bls[]) const; + bool disasm_all_crc_rev0(bufferlist segment_bls[], + bufferlist& epilogue_bl) const; + bool disasm_all_secure_rev0(bufferlist segment_bls[], + bufferlist& epilogue_bl) const; + void fill_preamble(Tag tag, preamble_block_t& preamble) const; friend std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm);