From ba66a6ab0469abc1cca5d0cb8c32756f9d44957a Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Mon, 4 Mar 2019 22:39:18 +0100 Subject: [PATCH] msg/async, v2: rework the class hierarchy - introduce ControlFrame. Signed-off-by: Radoslaw Zarzynski --- src/msg/async/frames_v2.h | 228 ++++++++++++++++++++------------------ 1 file changed, 118 insertions(+), 110 deletions(-) diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index b99269702a5..6d709bf50c5 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -72,7 +72,7 @@ struct SegmentIndex { static constexpr std::size_t DATA = 3; }; - struct Frame { + struct Control { static constexpr std::size_t PAYLOAD = 0; }; }; @@ -172,70 +172,57 @@ static ceph::bufferlist segment_onwire_bufferlist(const ceph::bufferlist &bl) return ret; } -template +template struct Frame { + static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS); protected: - ceph::bufferlist payload; + std::array segments; ceph::bufferlist::contiguous_filler preamble_filler; + // craft the main preamble. It's always present regardless of the number + // of segments message is composed from. void fill_preamble(const std::initializer_list main_segments) { ceph_assert(std::size(main_segments) <= MAX_NUM_SEGMENTS); - // Craft the main preamble. It's always present regardless of the number - // of segments message is composed from. This doesn't apply to extra one - // as it's optional -- if there is up to 2 segments, we'll never transmit - // preamble_extra_t; - { - preamble_block_t main_preamble; - // TODO: we might fill/pad with pseudo-random data. - ::memset(&main_preamble, 0, sizeof(main_preamble)); + preamble_block_t main_preamble; + // TODO: we might fill/pad with pseudo-random data. + ::memset(&main_preamble, 0, sizeof(main_preamble)); - main_preamble.num_segments = std::size(main_segments); - main_preamble.tag = static_cast<__u8>(T::tag); - ceph_assert(main_preamble.tag != 0); + main_preamble.num_segments = std::size(main_segments); + main_preamble.tag = static_cast<__u8>(T::tag); + ceph_assert(main_preamble.tag != 0); - std::copy(std::cbegin(main_segments), std::cend(main_segments), - std::begin(main_preamble.segments)); + std::copy(std::cbegin(main_segments), std::cend(main_segments), + std::begin(main_preamble.segments)); - main_preamble.crc = - ceph_crc32c(0, reinterpret_cast(&main_preamble), - sizeof(main_preamble) - sizeof(main_preamble.crc)); + main_preamble.crc = + ceph_crc32c(0, reinterpret_cast(&main_preamble), + sizeof(main_preamble) - sizeof(main_preamble.crc)); - preamble_filler.copy_in(sizeof(main_preamble), - reinterpret_cast(&main_preamble)); - } + preamble_filler.copy_in(sizeof(main_preamble), + reinterpret_cast(&main_preamble)); } - Frame() : preamble_filler(payload.append_hole(FRAME_PREAMBLE_SIZE)) {} - - void decode_frame(const ceph::bufferlist &bl) { - auto ti = bl.cbegin(); - static_cast(this)->decode_payload(ti); + Frame() + : preamble_filler(segments.front().append_hole(FRAME_PREAMBLE_SIZE)) { } - void decode_payload(bufferlist::const_iterator &ti) {} - public: - ceph::bufferlist &get_buffer() { - fill_preamble({segment_t{payload.length() - FRAME_PREAMBLE_SIZE, - segment_t::DEFAULT_ALIGNMENT}}); - - epilogue_plain_block_t epilogue; - ::memset(&epilogue, 0, sizeof(epilogue)); - - ceph::bufferlist::const_iterator hdriter(&this->payload, FRAME_PREAMBLE_SIZE); - epilogue.crc_values[SegmentIndex::Frame::PAYLOAD] = - hdriter.crc32c(hdriter.get_remaining(), -1); - this->payload.append(reinterpret_cast(&epilogue), sizeof(epilogue)); - - return payload; - } }; + +// ControlFrames are used to manage transceiver state (like connections) and +// orchestrate transfers of MessageFrames. They use only single segment with +// marshalling facilities -- derived classes specify frame structure through +// Args pack while ControlFrame provides common encode/decode machinery. template -class PayloadFrame : public Frame { +class ControlFrame : public Frame { protected: - // this tuple is only used when decoding values from a payload buffer + ceph::bufferlist &get_payload_segment() { + return this->segments[SegmentIndex::Control::PAYLOAD]; + } + + // this tuple is only used when decoding values from a payload segment std::tuple _values; // FIXME: for now, we assume specific features for the purpoess of encoding @@ -245,16 +232,16 @@ protected: template inline void _encode_payload_each(T &t) { if constexpr (std::is_same()) { - this->payload.claim_append((bufferlist &)t); + this->get_payload_segment().claim_append((bufferlist &)t); } else if constexpr (std::is_same const>()) { - encode((uint32_t)t.size(), this->payload, features); + encode((uint32_t)t.size(), this->get_payload_segment(), features); for (const auto &elem : t) { - encode(elem, this->payload, features); + encode(elem, this->get_payload_segment(), features); } } else if constexpr (std::is_same()) { - this->payload.append((char *)&t, sizeof(t)); + this->get_payload_segment().append((char *)&t, sizeof(t)); } else { - encode(t, this->payload, features); + encode(t, this->get_payload_segment(), features); } } @@ -291,13 +278,37 @@ protected: return std::get(_values); } - PayloadFrame() : Frame() {} + ControlFrame() : Frame() {} void _encode(const Args &... args) { (_encode_payload_each(args), ...); } + void _decode(const ceph::bufferlist &bl) { + auto ti = bl.cbegin(); + _decode_payload(ti, std::index_sequence_for()); + } + public: + ceph::bufferlist &get_buffer() { + this->fill_preamble({ + segment_t{ get_payload_segment().length() - FRAME_PREAMBLE_SIZE, + segment_t::DEFAULT_ALIGNMENT} + }); + + epilogue_plain_block_t epilogue; + ::memset(&epilogue, 0, sizeof(epilogue)); + + ceph::bufferlist::const_iterator hdriter(&this->get_payload_segment(), + FRAME_PREAMBLE_SIZE); + epilogue.crc_values[SegmentIndex::Control::PAYLOAD] = + hdriter.crc32c(hdriter.get_remaining(), -1); + get_payload_segment().append(reinterpret_cast(&epilogue), + sizeof(epilogue)); + + return get_payload_segment(); + } + static C Encode(const Args &... args) { C c; c._encode(args...); @@ -306,53 +317,49 @@ public: static C Decode(const ceph::bufferlist &payload) { C c; - c.decode_frame(payload); + c._decode(payload); return c; } - - void decode_payload(bufferlist::const_iterator &ti) { - _decode_payload(ti, std::index_sequence_for()); - } }; -struct HelloFrame : public PayloadFrame { // peer address static const Tag tag = Tag::HELLO; - using PayloadFrame::Encode; - using PayloadFrame::Decode; + using ControlFrame::Encode; + using ControlFrame::Decode; inline uint8_t &entity_type() { return get_val<0>(); } inline entity_addr_t &peer_addr() { return get_val<1>(); } protected: - using PayloadFrame::PayloadFrame; + using ControlFrame::ControlFrame; }; -struct AuthRequestFrame : public PayloadFrame, // preferred modes bufferlist> { // auth payload static const Tag tag = Tag::AUTH_REQUEST; - using PayloadFrame::Encode; - using PayloadFrame::Decode; + using ControlFrame::Encode; + using ControlFrame::Decode; inline uint32_t &method() { return get_val<0>(); } inline vector &preferred_modes() { return get_val<1>(); } inline bufferlist &auth_payload() { return get_val<2>(); } protected: - using PayloadFrame::PayloadFrame; + using ControlFrame::ControlFrame; }; -struct AuthBadMethodFrame : public PayloadFrame, // allowed methods std::vector> { // allowed modes static const Tag tag = Tag::AUTH_BAD_METHOD; - using PayloadFrame::Encode; - using PayloadFrame::Decode; + using ControlFrame::Encode; + using ControlFrame::Decode; inline uint32_t &method() { return get_val<0>(); } inline int32_t &result() { return get_val<1>(); } @@ -360,94 +367,94 @@ struct AuthBadMethodFrame : public PayloadFrame &allowed_modes() { return get_val<3>(); } protected: - using PayloadFrame::PayloadFrame; + using ControlFrame::ControlFrame; }; -struct AuthReplyMoreFrame : public PayloadFrame { // auth payload static const Tag tag = Tag::AUTH_REPLY_MORE; - using PayloadFrame::Encode; - using PayloadFrame::Decode; + using ControlFrame::Encode; + using ControlFrame::Decode; inline bufferlist &auth_payload() { return get_val<0>(); } protected: - using PayloadFrame::PayloadFrame; + using ControlFrame::ControlFrame; }; -struct AuthRequestMoreFrame : public PayloadFrame { // auth payload static const Tag tag = Tag::AUTH_REQUEST_MORE; - using PayloadFrame::Encode; - using PayloadFrame::Decode; + using ControlFrame::Encode; + using ControlFrame::Decode; inline bufferlist &auth_payload() { return get_val<0>(); } protected: - using PayloadFrame::PayloadFrame; + using ControlFrame::ControlFrame; }; -struct AuthDoneFrame : public PayloadFrame { // auth method payload static const Tag tag = Tag::AUTH_DONE; - using PayloadFrame::Encode; - using PayloadFrame::Decode; + using ControlFrame::Encode; + using ControlFrame::Decode; inline uint64_t &global_id() { return get_val<0>(); } inline uint32_t &con_mode() { return get_val<1>(); } inline bufferlist &auth_payload() { return get_val<2>(); } protected: - using PayloadFrame::PayloadFrame; + using ControlFrame::ControlFrame; }; template -struct SignedEncryptedFrame : public PayloadFrame { +struct SignedEncryptedFrame : public ControlFrame { ceph::bufferlist &get_buffer() { // In contrast to Frame::get_buffer() we don't fill preamble here. - return this->payload; + return this->get_payload_segment(); } static T Encode(ceph::crypto::onwire::rxtx_t &session_stream_handlers, const Args &... args) { - T c = PayloadFrame::Encode(args...); - c.fill_preamble({segment_t{c.payload.length() - FRAME_PREAMBLE_SIZE, + T c = ControlFrame::Encode(args...); + c.fill_preamble({segment_t{c.get_payload_segment().length() - FRAME_PREAMBLE_SIZE, segment_t::DEFAULT_ALIGNMENT}}); if (session_stream_handlers.tx) { - c.payload = segment_onwire_bufferlist(std::move(c.payload)); + c.get_payload_segment() = segment_onwire_bufferlist(std::move(c.get_payload_segment())); epilogue_secure_block_t epilogue; ::memset(&epilogue, 0, sizeof(epilogue)); - c.payload.append(reinterpret_cast(&epilogue), sizeof(epilogue)); + c.get_payload_segment().append(reinterpret_cast(&epilogue), sizeof(epilogue)); ceph_assert(session_stream_handlers.tx); - session_stream_handlers.tx->reset_tx_handler({c.payload.length()}); + session_stream_handlers.tx->reset_tx_handler({c.get_payload_segment().length()}); session_stream_handlers.tx->authenticated_encrypt_update( - std::move(c.payload)); - c.payload = session_stream_handlers.tx->authenticated_encrypt_final(); + std::move(c.get_payload_segment())); + c.get_payload_segment() = session_stream_handlers.tx->authenticated_encrypt_final(); } else { epilogue_plain_block_t epilogue; ::memset(&epilogue, 0, sizeof(epilogue)); - ceph::bufferlist::const_iterator hdriter(&c.payload, FRAME_PREAMBLE_SIZE); - epilogue.crc_values[SegmentIndex::Frame::PAYLOAD] = + ceph::bufferlist::const_iterator hdriter(&c.get_payload_segment(), FRAME_PREAMBLE_SIZE); + epilogue.crc_values[SegmentIndex::Control::PAYLOAD] = hdriter.crc32c(hdriter.get_remaining(), -1); - c.payload.append(reinterpret_cast(&epilogue), sizeof(epilogue)); + c.get_payload_segment().append(reinterpret_cast(&epilogue), sizeof(epilogue)); } return c; } static T Decode(ceph::crypto::onwire::rxtx_t &session_stream_handlers, ceph::bufferlist &payload) { - return PayloadFrame::Decode(payload); + return ControlFrame::Decode(payload); } protected: - SignedEncryptedFrame() : PayloadFrame() {} + SignedEncryptedFrame() : ControlFrame() {} }; struct ClientIdentFrame @@ -640,12 +647,12 @@ protected: // Body is processed almost independently with the sole junction point // being the `extra_payload_len` passed to get_buffer(). struct MessageHeaderFrame - : public PayloadFrame { + : public ControlFrame { static const Tag tag = Tag::MESSAGE; ceph::bufferlist &get_buffer() { // In contrast to Frame::get_buffer() we don't fill preamble here. - return this->payload; + return this->get_payload_segment(); } static MessageHeaderFrame Encode(ceph::crypto::onwire::rxtx_t &session_stream_handlers, @@ -654,10 +661,10 @@ struct MessageHeaderFrame const ceph::bufferlist& middle, const ceph::bufferlist& data) { MessageHeaderFrame f = - PayloadFrame::Encode(msg_header); + ControlFrame::Encode(msg_header); f.fill_preamble({ - segment_t{ f.payload.length() - FRAME_PREAMBLE_SIZE, + segment_t{ f.get_payload_segment().length() - FRAME_PREAMBLE_SIZE, segment_t::DEFAULT_ALIGNMENT }, segment_t{ front.length(), segment_t::DEFAULT_ALIGNMENT }, segment_t{ middle.length(), segment_t::DEFAULT_ALIGNMENT }, @@ -668,22 +675,23 @@ struct MessageHeaderFrame // we're padding segments to biggest cipher's block size. Although // AES-GCM can live without that as it's a stream cipher, we don't // to be fixed to stream ciphers only. - f.payload = segment_onwire_bufferlist(std::move(f.payload)); + f.get_payload_segment() = + segment_onwire_bufferlist(std::move(f.get_payload_segment())); auto front_padded = segment_onwire_bufferlist(front); auto middle_padded = segment_onwire_bufferlist(middle); auto data_padded = segment_onwire_bufferlist(data); // let's cipher allocate one huge buffer for entire ciphertext. session_stream_handlers.tx->reset_tx_handler({ - f.payload.length(), + f.get_payload_segment().length(), front_padded.length(), middle_padded.length(), data_padded.length() }); - ceph_assert(f.payload.length()); + ceph_assert(f.get_payload_segment().length()); session_stream_handlers.tx->authenticated_encrypt_update( - std::move(f.payload)); + std::move(f.get_payload_segment())); // TODO: switch TxHandler from `bl&&` to `const bl&`. if (front.length()) { @@ -707,22 +715,22 @@ struct MessageHeaderFrame } // auth tag will be appended at the end - f.payload = session_stream_handlers.tx->authenticated_encrypt_final(); + f.get_payload_segment() = session_stream_handlers.tx->authenticated_encrypt_final(); } else { epilogue_plain_block_t epilogue; ::memset(&epilogue, 0, sizeof(epilogue)); - ceph::bufferlist::const_iterator hdriter(&f.payload, FRAME_PREAMBLE_SIZE); + ceph::bufferlist::const_iterator hdriter(&f.get_payload_segment(), FRAME_PREAMBLE_SIZE); epilogue.crc_values[SegmentIndex::Msg::HEADER] = hdriter.crc32c(hdriter.get_remaining(), -1); epilogue.crc_values[SegmentIndex::Msg::FRONT] = front.crc32c(-1); epilogue.crc_values[SegmentIndex::Msg::MIDDLE] = middle.crc32c(-1); epilogue.crc_values[SegmentIndex::Msg::DATA] = data.crc32c(-1); - f.payload.append(front); - f.payload.append(middle); - f.payload.append(data); - f.payload.append(reinterpret_cast(&epilogue), sizeof(epilogue)); + f.get_payload_segment().append(front); + f.get_payload_segment().append(middle); + f.get_payload_segment().append(data); + f.get_payload_segment().append(reinterpret_cast(&epilogue), sizeof(epilogue)); } return f; @@ -737,7 +745,7 @@ struct MessageHeaderFrame inline ceph_msg_header2 &header() { return get_val<0>(); } protected: - using PayloadFrame::PayloadFrame; + using ControlFrame::ControlFrame; }; } // namespace ceph::msgr::v2 -- 2.39.5