From 87ba2eb7009062e94cf13b9f321d9e923c738186 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 25 Feb 2019 16:23:04 +0000 Subject: [PATCH] msg/async: msgr2: extract frame classes into its own header Signed-off-by: Ricardo Dias --- src/msg/async/ProtocolV2.cc | 531 +++--------------------------------- src/msg/async/ProtocolV2.h | 43 +-- src/msg/async/frames_v2.h | 511 ++++++++++++++++++++++++++++++++++ 3 files changed, 547 insertions(+), 538 deletions(-) create mode 100644 src/msg/async/frames_v2.h diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 1829336f1bb..d4e9ed7fb9b 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -29,22 +29,7 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { << ")."; } -// We require these features from any peer, period, in order to encode -// a entity_addrvec_t. -const uint64_t msgr2_required = CEPH_FEATUREMASK_MSG_ADDR2; - -// We additionally assume the peer has the below features *purely for -// the purpose of encoding the frames themselves*. The only complex -// types in the frames are entity_addr_t and entity_addrvec_t, and we -// specifically want the peer to understand the (new in nautilus) -// TYPE_ANY. We treat narrow this assumption to frames because we -// expect there may be future clients (the kernel) that understand -// msgr v2 and understand this encoding but don't necessarily have -// everything else that SERVER_NAUTILUS implies. Yes, a fresh feature -// bit would be a cleaner approach, but those are scarce these days. -const uint64_t msgr2_frame_assumed = - msgr2_required | - CEPH_FEATUREMASK_SERVER_NAUTILUS; +using namespace ceph::msgr::v2; using CtPtr = Ct *; @@ -104,460 +89,6 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) data.push_back(std::move(ptr)); } -/** - * Protocol V2 Frame Structures - **/ - -static constexpr uint8_t CRYPTO_BLOCK_SIZE { 16 }; - -using segment_t = ProtocolV2::segment_t; - -// V2 preamble consists of one or more preamble blocks depending on -// the number of segments a particular frame needs. Each block holds -// up to MAX_NUM_SEGMENTS segments and has its own CRC. -// -// XXX: currently the multi-segment facility is NOT implemented. -struct preamble_block_t { - static constexpr std::size_t MAX_NUM_SEGMENTS = 4; - - // ProtocolV2::Tag. For multi-segmented frames the value is the same - // between subsequent preamble blocks. - __u8 tag; - - // Number of segments to go in entire frame. First preable block has - // set this to just #segments, second #segments - MAX_NUM_SEGMENTS, - // third to #segments - MAX_NUM_SEGMENTS and so on. - __u8 num_segments; - - std::array segments; - __u8 _reserved[2]; - - // CRC32 for this single preamble block. - __le32 crc; -} __attribute__((packed)); -static_assert(sizeof(preamble_block_t) % CRYPTO_BLOCK_SIZE == 0); -static_assert(std::is_standard_layout::value); - - -static constexpr uint32_t FRAME_PREAMBLE_SIZE = sizeof(preamble_block_t); - -template -struct Frame { -protected: - ceph::bufferlist payload; - ceph::bufferlist::contiguous_filler preamble_filler; - - void fill_preamble( - const std::initializer_list main_segments) - { - ceph_assert( - std::size(main_segments) <= preamble_block_t::MAX_NUM_SEGMENTS); - - // Craft the main preamble. It's always present regardless of the number - // of segments message is composed from. This doesn't apply to extra one - // as it's optional -- if there is up to 2 segments, we'll never transmit - // preamble_extra_t; - { - preamble_block_t main_preamble; - // TODO: we might fill/pad with pseudo-random data. - ::memset(&main_preamble, 0, sizeof(main_preamble)); - - main_preamble.num_segments = std::size(main_segments); - main_preamble.tag = static_cast<__u8>(T::tag); - ceph_assert(main_preamble.tag != 0); - - std::copy(std::cbegin(main_segments), std::cend(main_segments), - std::begin(main_preamble.segments)); - - main_preamble.crc = ceph_crc32c(0, - reinterpret_cast(&main_preamble), - sizeof(main_preamble) - sizeof(main_preamble.crc)); - - preamble_filler.copy_in(sizeof(main_preamble), - reinterpret_cast(&main_preamble)); - } - } - -public: - Frame() : preamble_filler(payload.append_hole(FRAME_PREAMBLE_SIZE)) {} - - ceph::bufferlist &get_buffer() { - fill_preamble({ - segment_t{ payload.length() - FRAME_PREAMBLE_SIZE, - segment_t::DEFAULT_ALIGNMENT } - }); - return payload; - } - - void decode_frame(const ceph::bufferlist& bl) { - auto ti = bl.cbegin(); - static_cast(this)->decode_payload(ti); - } - - void decode_payload(bufferlist::const_iterator &ti) {} -}; - -// TODO, FIXME: fix this altogether with the Frame hierarchy rework -struct do_not_encode_tag_t {}; -struct dummy_ctor_conflict_helper {}; - -template -struct PayloadFrame : public Frame { -protected: - // this tuple is only used when decoding values from a payload buffer - std::tuple _values; - - // FIXME: for now, we assume specific features for the purpoess of encoding - // the frames themselves (*not* messages in message frames!). - uint64_t features = msgr2_frame_assumed; - - template - inline void _encode_payload_each(T &t) { - if constexpr (std::is_same()) { - this->payload.claim_append((bufferlist &)t); - } else if constexpr (std::is_same const>()) { - encode((uint32_t)t.size(), this->payload, features); - for (const auto &elem : t) { - encode(elem, this->payload, features); - } - } else if constexpr (std::is_same()) { - this->payload.append((char *)&t, sizeof(t)); - } else if constexpr (std::is_same()) { - /* NOP, only to discriminate ctors for decode/encode. FIXME. */ - } else { - encode(t, this->payload, features); - } - } - - template - inline void _decode_payload_each(T &t, bufferlist::const_iterator &ti) const { - if constexpr (std::is_same()) { - if (ti.get_remaining()) { - t.append(ti.get_current_ptr()); - } - } else if constexpr (std::is_same>()) { - uint32_t size; - decode(size, ti); - t.resize(size); - for (uint32_t i = 0; i < size; ++i) { - decode(t[i], ti); - } - } else if constexpr (std::is_same()) { - auto ptr = ti.get_current_ptr(); - ti.advance(sizeof(T)); - t = *(T *)ptr.raw_c_str(); - } else if constexpr (std::is_same()) { - /* NOP, only to discriminate ctors for decode/encode. FIXME. */ - } else { - decode(t, ti); - } - } - - template - inline void _decode_payload(bufferlist::const_iterator &ti, - std::index_sequence) const { - (_decode_payload_each((Args &)std::get(_values), ti), ...); - } - - template - inline decltype(auto) get_val() { - return std::get(_values); - } - -public: - PayloadFrame(const Args &... args) { - (_encode_payload_each(args), ...); - } - - PayloadFrame(do_not_encode_tag_t) {} - - PayloadFrame(const ceph::bufferlist &payload) { - this->decode_frame(payload); - } - - void decode_payload(bufferlist::const_iterator &ti) { - _decode_payload(ti, std::index_sequence_for()); - } -}; - -struct HelloFrame : public PayloadFrame { // peer_addr - static const ProtocolV2::Tag tag = ProtocolV2::Tag::HELLO; - using PayloadFrame::PayloadFrame; - - inline uint8_t &entity_type() { return get_val<0>(); } - inline entity_addr_t &peer_addr() { return get_val<1>(); } -}; - -struct AuthRequestFrame - : public PayloadFrame, bufferlist> { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST; - using PayloadFrame::PayloadFrame; - - inline uint32_t &method() { return get_val<0>(); } - inline vector &preferred_modes() { return get_val<1>(); } - inline bufferlist &auth_payload() { return get_val<2>(); } -}; - -struct AuthBadMethodFrame - : public PayloadFrame, // allowed_methods - std::vector> { // allowed_modes - static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_METHOD; - using PayloadFrame::PayloadFrame; - - inline uint32_t &method() { return get_val<0>(); } - inline int32_t &result() { return get_val<1>(); } - inline std::vector &allowed_methods() { return get_val<2>(); } - inline std::vector &allowed_modes() { return get_val<3>(); } -}; - -struct AuthReplyMoreFrame - : public PayloadFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REPLY_MORE; - using PayloadFrame::PayloadFrame; - - inline bufferlist &auth_payload() { return get_val<1>(); } -}; - -struct AuthRequestMoreFrame - : public PayloadFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST_MORE; - using PayloadFrame::PayloadFrame; - - inline bufferlist &auth_payload() { return get_val<1>(); } -}; - -struct AuthDoneFrame - : public PayloadFrame { // auth method payload - static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_DONE; - using PayloadFrame::PayloadFrame; - - inline uint64_t &global_id() { return get_val<0>(); } - inline uint32_t &con_mode() { return get_val<1>(); } - inline bufferlist &auth_payload() { return get_val<2>(); } -}; - -template -struct SignedEncryptedFrame : public PayloadFrame { - ceph::bufferlist &get_buffer() { - // In contrast to Frame::get_buffer() we don't fill preamble here. - return this->payload; - } - - SignedEncryptedFrame(ProtocolV2 &protocol, const Args &... args) - : PayloadFrame(args...) - { - // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart - // from auth tag size - this->fill_preamble({ - segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE, - segment_t::DEFAULT_ALIGNMENT } - }); - - if (protocol.session_stream_handlers.tx) { - ceph_assert(protocol.session_stream_handlers.tx); - protocol.session_stream_handlers.tx->reset_tx_handler({ - this->payload.length() - }); - - protocol.session_stream_handlers.tx->authenticated_encrypt_update( - std::move(this->payload)); - this->payload = \ - protocol.session_stream_handlers.tx->authenticated_encrypt_final(); - } - } - - SignedEncryptedFrame(ProtocolV2 &protocol, ceph::bufferlist& bl) - : PayloadFrame(do_not_encode_tag_t{}) - { - if (!protocol.session_stream_handlers.rx) { - this->decode_frame(bl); - return; - } - - const auto length = bl.length(); - ceph::bufferlist plain_bl = \ - protocol.session_stream_handlers.rx->authenticated_decrypt_update_final( - std::move(bl), ProtocolV2::segment_t::DEFAULT_ALIGNMENT); - ceph_assert(plain_bl.length() == length - \ - protocol.session_stream_handlers.rx->get_extra_size_at_final()); - this->decode_frame(plain_bl); - } -}; - -struct ClientIdentFrame - : public SignedEncryptedFrame { // client cookie - static const ProtocolV2::Tag tag = ProtocolV2::Tag::CLIENT_IDENT; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline entity_addrvec_t &addrs() { return get_val<0>(); } - inline entity_addr_t &target_addr() { return get_val<1>(); } - inline int64_t &gid() { return get_val<2>(); } - inline uint64_t &global_seq() { return get_val<3>(); } - inline uint64_t &supported_features() { return get_val<4>(); } - inline uint64_t &required_features() { return get_val<5>(); } - inline uint64_t &flags() { return get_val<6>(); } - inline uint64_t &cookie() { return get_val<7>(); } -}; - -struct ServerIdentFrame - : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::SERVER_IDENT; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline entity_addrvec_t &addrs() { return get_val<0>(); } - inline int64_t &gid() { return get_val<1>(); } - inline uint64_t &global_seq() { return get_val<2>(); } - inline uint64_t &supported_features() { return get_val<3>(); } - inline uint64_t &required_features() { return get_val<4>(); } - inline uint64_t &flags() { return get_val<5>(); } - inline uint64_t &cookie() { return get_val<6>(); } -}; - -struct ReconnectFrame - : public SignedEncryptedFrame { // message sequence - static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline entity_addrvec_t &addrs() { return get_val<0>(); } - inline uint64_t &client_cookie() { return get_val<1>(); } - inline uint64_t &server_cookie() { return get_val<2>(); } - inline uint64_t &global_seq() { return get_val<3>(); } - inline uint64_t &connect_seq() { return get_val<4>(); } - inline uint64_t &msg_seq() { return get_val<5>(); } -}; - -struct ResetFrame : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RESET; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline bool &full() { return get_val<0>(); } -}; - -struct RetryFrame : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline uint64_t &connect_seq() { return get_val<0>(); } -}; - -struct RetryGlobalFrame - : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY_GLOBAL; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline uint64_t &global_seq() { return get_val<0>(); } -}; - -struct WaitFrame : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::WAIT; - using SignedEncryptedFrame::SignedEncryptedFrame; -}; - -struct ReconnectOkFrame - : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT_OK; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline uint64_t &msg_seq() { return get_val<0>(); } -}; - -struct IdentMissingFeaturesFrame - : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT_MISSING_FEATURES; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline uint64_t &features() { return get_val<0>(); } -}; - -struct KeepAliveFrame : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::KEEPALIVE2; - using SignedEncryptedFrame::SignedEncryptedFrame; - - KeepAliveFrame(ProtocolV2 &protocol) - : KeepAliveFrame(protocol, ceph_clock_now()) {} - - inline utime_t ×tamp() { return get_val<0>(); } -}; - -struct KeepAliveFrameAck - : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::KEEPALIVE2_ACK; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline utime_t ×tamp() { return get_val<0>(); } -}; - -struct AckFrame : public SignedEncryptedFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::ACK; - using SignedEncryptedFrame::SignedEncryptedFrame; - - inline uint64_t &seq() { return get_val<0>(); } -}; - -// This class is used for encoding/decoding header of the message frame. -// Body is processed almost independently with the sole junction point -// being the `extra_payload_len` passed to get_buffer(). -struct MessageHeaderFrame - : public PayloadFrame { - static const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE; - - ceph::bufferlist &get_buffer() { - // In contrast to Frame::get_buffer() we don't fill preamble here. - return this->payload; - } - - MessageHeaderFrame(const ceph_msg_header2 &msghdr, - const uint32_t front_len, - const uint32_t middle_len, - const uint32_t data_len) - : PayloadFrame(msghdr) - { - // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size - fill_preamble({ - segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE, - segment_t::DEFAULT_ALIGNMENT }, - segment_t{ front_len, segment_t::DEFAULT_ALIGNMENT }, - segment_t{ middle_len, segment_t::DEFAULT_ALIGNMENT }, - segment_t{ data_len, segment_t::DEFERRED_ALLOCATION }, - }); - } - - MessageHeaderFrame(ceph::bufferlist&& text) - : PayloadFrame(do_not_encode_tag_t{}) - { - this->decode_frame(text); - } - - inline ceph_msg_header2 &header() { return get_val<0>(); } -}; - ProtocolV2::ProtocolV2(AsyncConnection *connection) : Protocol(2, connection), temp_buffer(nullptr), @@ -1058,12 +589,12 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { void ProtocolV2::append_keepalive() { ldout(cct, 10) << __func__ << dendl; - KeepAliveFrame keepalive_frame(*this); + KeepAliveFrame keepalive_frame(session_stream_handlers); connection->outcoming_bl.claim_append(keepalive_frame.get_buffer()); } void ProtocolV2::append_keepalive_ack(utime_t ×tamp) { - KeepAliveFrameAck keepalive_ack_frame(*this, timestamp); + KeepAliveFrameAck keepalive_ack_frame(session_stream_handlers, timestamp); connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer()); } @@ -1143,7 +674,7 @@ void ProtocolV2::write_event() { if (left) { ceph_le64 s; s = in_seq; - AckFrame ack(*this, in_seq); + AckFrame ack(session_stream_handlers, in_seq); connection->outcoming_bl.claim_append(ack.get_buffer()); ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; @@ -2145,7 +1676,7 @@ CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - KeepAliveFrame keepalive_frame(*this, payload); + KeepAliveFrame keepalive_frame(session_stream_handlers, payload); ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; @@ -2169,7 +1700,7 @@ CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - KeepAliveFrameAck keepalive_ack_frame(*this, payload); + KeepAliveFrameAck keepalive_ack_frame(session_stream_handlers, payload); connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp()); ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; @@ -2181,7 +1712,7 @@ CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - AckFrame ack(*this, payload); + AckFrame ack(session_stream_handlers, payload); handle_message_ack(ack.seq()); return CONTINUE(read_frame); } @@ -2375,7 +1906,7 @@ CtPtr ProtocolV2::send_client_ident() { } } - ClientIdentFrame client_ident(*this, messenger->get_myaddrs(), + ClientIdentFrame client_ident(session_stream_handlers, messenger->get_myaddrs(), connection->target_addr, messenger->get_myname().num(), global_seq, connection->policy.features_supported, @@ -2402,7 +1933,7 @@ CtPtr ProtocolV2::send_client_ident() { CtPtr ProtocolV2::send_reconnect() { ldout(cct, 20) << __func__ << dendl; - ReconnectFrame reconnect(*this, messenger->get_myaddrs(), + ReconnectFrame reconnect(session_stream_handlers, messenger->get_myaddrs(), client_cookie, server_cookie, global_seq, @@ -2425,7 +1956,7 @@ CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - IdentMissingFeaturesFrame ident_missing(*this, payload); + IdentMissingFeaturesFrame ident_missing(session_stream_handlers, payload); lderr(cct) << __func__ << " client does not support all server features: " << std::hex << ident_missing.features() << std::dec << dendl; @@ -2438,7 +1969,7 @@ CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - ResetFrame reset(*this, payload); + ResetFrame reset(session_stream_handlers, payload); ldout(cct, 1) << __func__ << " received session reset full=" << reset.full() << dendl; @@ -2458,7 +1989,7 @@ CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - RetryFrame retry(*this, payload); + RetryFrame retry(session_stream_handlers, payload); connect_seq = retry.connect_seq() + 1; ldout(cct, 1) << __func__ @@ -2473,7 +2004,7 @@ CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - RetryGlobalFrame retry(*this, payload); + RetryGlobalFrame retry(session_stream_handlers, payload); global_seq = messenger->get_global_seq(retry.global_seq()); ldout(cct, 1) << __func__ << " received session retry global global_seq=" @@ -2489,7 +2020,7 @@ CtPtr ProtocolV2::handle_wait() { state = WAIT; ceph_assert(rx_segments_data.size() == 1); ceph_assert(rx_segments_desc.size() == 1); - WaitFrame(*this, rx_segments_data[SegmentIndex::Frame::PAYLOAD]); + WaitFrame(session_stream_handlers, rx_segments_data[SegmentIndex::Frame::PAYLOAD]); return _fault(); } @@ -2498,7 +2029,7 @@ CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - ReconnectOkFrame reconnect_ok(*this, payload); + ReconnectOkFrame reconnect_ok(session_stream_handlers, payload); ldout(cct, 5) << __func__ << " reconnect accepted: sms=" << reconnect_ok.msg_seq() << dendl; @@ -2525,7 +2056,7 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - ServerIdentFrame server_ident(*this, payload); + ServerIdentFrame server_ident(session_stream_handlers, payload); ldout(cct, 5) << __func__ << " received server identification:" << " addrs=" << server_ident.addrs() << " gid=" << server_ident.gid() @@ -2677,7 +2208,7 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - ClientIdentFrame client_ident(*this, payload); + ClientIdentFrame client_ident(session_stream_handlers, payload); ldout(cct, 5) << __func__ << " received client identification:" << " addrs=" << client_ident.addrs() @@ -2717,7 +2248,7 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) if (feat_missing) { ldout(cct, 1) << __func__ << " peer missing required features " << std::hex << feat_missing << std::dec << dendl; - IdentMissingFeaturesFrame ident_missing_features(*this, feat_missing); + IdentMissingFeaturesFrame ident_missing_features(session_stream_handlers, feat_missing); bufferlist &bl = ident_missing_features.get_buffer(); return WRITE(bl, "ident missing features", read_frame); @@ -2768,7 +2299,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; - ReconnectFrame reconnect(*this, payload); + ReconnectFrame reconnect(session_stream_handlers, payload); ldout(cct, 5) << __func__ << " received reconnect:" @@ -2813,7 +2344,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) // session ldout(cct, 0) << __func__ << " no existing connection exists, reseting client" << dendl; - ResetFrame reset(*this, true); + ResetFrame reset(session_stream_handlers, true); return WRITE(reset.get_buffer(), "session reset", read_frame); } @@ -2828,7 +2359,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) if (exproto->state == CLOSED) { ldout(cct, 5) << __func__ << " existing " << existing << " already closed. Reseting client" << dendl; - ResetFrame reset(*this, true); + ResetFrame reset(session_stream_handlers, true); return WRITE(reset.get_buffer(), "session reset", read_frame); } @@ -2836,7 +2367,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) ldout(cct, 1) << __func__ << " existing racing replace happened while replacing." << " existing=" << existing << dendl; - RetryGlobalFrame retry(*this, exproto->peer_global_seq); + RetryGlobalFrame retry(session_stream_handlers, exproto->peer_global_seq); bufferlist &bl = retry.get_buffer(); return WRITE(bl, "session retry", read_frame); } @@ -2848,7 +2379,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) << " rcc=" << reconnect.client_cookie() << ", reseting client." << dendl; - ResetFrame reset(*this, connection->policy.resetcheck); + ResetFrame reset(session_stream_handlers, connection->policy.resetcheck); return WRITE(reset.get_buffer(), "session reset", read_frame); } else if (exproto->server_cookie == 0) { // this happens when: @@ -2861,7 +2392,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) ldout(cct, 1) << __func__ << " I was a client and didn't received the" << " server_ident. Asking peer to resume session" << " establishment" << dendl; - ResetFrame reset(*this, false); + ResetFrame reset(session_stream_handlers, false); return WRITE(reset.get_buffer(), "session reset", read_frame); } @@ -2870,7 +2401,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) << " stale global_seq: sgs=" << exproto->peer_global_seq << " cgs=" << reconnect.global_seq() << ", ask client to retry global" << dendl; - RetryGlobalFrame retry(*this, exproto->peer_global_seq); + RetryGlobalFrame retry(session_stream_handlers, exproto->peer_global_seq); INTERCEPT(18); @@ -2882,7 +2413,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) << " stale connect_seq scs=" << exproto->connect_seq << " ccs=" << reconnect.connect_seq() << " , ask client to retry" << dendl; - RetryFrame retry(*this, exproto->connect_seq); + RetryFrame retry(session_stream_handlers, exproto->connect_seq); return WRITE(retry.get_buffer(), "session retry", read_frame); } @@ -2897,7 +2428,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) << " reconnect race detected, this connection loses to existing=" << existing << dendl; - WaitFrame wait(*this); + WaitFrame wait(session_stream_handlers); return WRITE(wait.get_buffer(), "wait", read_frame); } else { // this connection wins @@ -2940,7 +2471,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { ldout(cct, 1) << __func__ << " existing racing replace happened while replacing." << " existing=" << existing << dendl; - WaitFrame wait(*this); + WaitFrame wait(session_stream_handlers); return WRITE(wait.get_buffer(), "wait", read_frame); } @@ -3017,7 +2548,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { // has something to send to us. existing->send_keepalive(); existing->lock.lock(); - WaitFrame wait(*this); + WaitFrame wait(session_stream_handlers); bufferlist &bl = wait.get_buffer(); return WRITE(bl, "wait", read_frame); } @@ -3156,7 +2687,7 @@ CtPtr ProtocolV2::send_server_ident() { uint64_t gs = messenger->get_global_seq(); ServerIdentFrame server_ident( - *this, messenger->get_myaddrs(), messenger->get_myname().num(), gs, + session_stream_handlers, messenger->get_myaddrs(), messenger->get_myname().num(), gs, connection->policy.features_supported, connection->policy.features_required | msgr2_required, flags, @@ -3225,7 +2756,7 @@ CtPtr ProtocolV2::send_reconnect_ok() { out_seq = discard_requeued_up_to(out_seq, message_seq); uint64_t ms = in_seq; - ReconnectOkFrame reconnect_ok(*this, ms); + ReconnectOkFrame reconnect_ok(session_stream_handlers, ms); ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl; diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 66e4e9752a7..50dbbeecf97 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -8,6 +8,7 @@ #include "Protocol.h" #include "crypto_onwire.h" +#include "frames_v2.h" class ProtocolV2 : public Protocol { private: @@ -49,28 +50,6 @@ private: } public: - enum class Tag : __u8 { - HELLO = 1, - AUTH_REQUEST, - AUTH_BAD_METHOD, - AUTH_REPLY_MORE, - AUTH_REQUEST_MORE, - AUTH_DONE, - CLIENT_IDENT, - SERVER_IDENT, - IDENT_MISSING_FEATURES, - SESSION_RECONNECT, - SESSION_RESET, - SESSION_RETRY, - SESSION_RETRY_GLOBAL, - SESSION_RECONNECT_OK, - WAIT, - MESSAGE, - KEEPALIVE2, - KEEPALIVE2_ACK, - ACK - }; - // TODO: move into auth_meta? ceph::crypto::onwire::rxtx_t session_stream_handlers; private: @@ -106,16 +85,6 @@ private: uint32_t next_payload_len; public: - struct segment_t { - // TODO: this will be dropped with support for `allocation policies`. - // We need them because of the rx_buffers zero-copy optimization. - static constexpr __le16 DEFERRED_ALLOCATION { 0x0000 }; - - static constexpr __le16 DEFAULT_ALIGNMENT = sizeof(void*); - - __le32 length; - __le16 alignment; - } __attribute__((packed)); struct onwire_segment_t { // crypto-processed segment can be expanded on-wire because of: @@ -124,11 +93,9 @@ public: // See RxHandler::get_extra_size_at_final(). __le32 onwire_length; - struct segment_t logical; + struct ceph::msgr::v2::segment_t logical; } __attribute__((packed)); - static constexpr std::size_t MAX_NUM_SEGMENTS = 4; - struct SegmentIndex { struct Msg { static constexpr std::size_t HEADER = 0; @@ -143,13 +110,13 @@ public: }; boost::container::static_vector rx_segments_desc; + ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc; boost::container::static_vector rx_segments_data; + ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data; private: - Tag next_tag; + ceph::msgr::v2::Tag next_tag; ceph_msg_header2 current_header; utime_t backoff; // backoff time utime_t recv_stamp; diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h new file mode 100644 index 00000000000..5b9257734d3 --- /dev/null +++ b/src/msg/async/frames_v2.h @@ -0,0 +1,511 @@ +#ifndef _MSG_ASYNC_FRAMES_V2_ +#define _MSG_ASYNC_FRAMES_V2_ + +#include "include/types.h" +#include "crypto_onwire.h" +#include + +/** + * Protocol V2 Frame Structures + * + * Documentation in: doc/dev/msgr2.rst + **/ + +namespace ceph::msgr::v2 { + +// We require these features from any peer, period, in order to encode +// a entity_addrvec_t. +const uint64_t msgr2_required = CEPH_FEATUREMASK_MSG_ADDR2; + +// We additionally assume the peer has the below features *purely for +// the purpose of encoding the frames themselves*. The only complex +// types in the frames are entity_addr_t and entity_addrvec_t, and we +// specifically want the peer to understand the (new in nautilus) +// TYPE_ANY. We treat narrow this assumption to frames because we +// expect there may be future clients (the kernel) that understand +// msgr v2 and understand this encoding but don't necessarily have +// everything else that SERVER_NAUTILUS implies. Yes, a fresh feature +// bit would be a cleaner approach, but those are scarce these days. +const uint64_t msgr2_frame_assumed = + msgr2_required | + CEPH_FEATUREMASK_SERVER_NAUTILUS; + +enum class Tag : __u8 { + HELLO = 1, + AUTH_REQUEST, + AUTH_BAD_METHOD, + AUTH_REPLY_MORE, + AUTH_REQUEST_MORE, + AUTH_DONE, + CLIENT_IDENT, + SERVER_IDENT, + IDENT_MISSING_FEATURES, + SESSION_RECONNECT, + SESSION_RESET, + SESSION_RETRY, + SESSION_RETRY_GLOBAL, + SESSION_RECONNECT_OK, + WAIT, + MESSAGE, + KEEPALIVE2, + KEEPALIVE2_ACK, + ACK +}; + +struct segment_t { + // TODO: this will be dropped with support for `allocation policies`. + // We need them because of the rx_buffers zero-copy optimization. + static constexpr __le16 DEFERRED_ALLOCATION{0x0000}; + + static constexpr __le16 DEFAULT_ALIGNMENT = sizeof(void *); + + __le32 length; + __le16 alignment; +} __attribute__((packed)); + +static constexpr uint8_t CRYPTO_BLOCK_SIZE { 16 }; + +static constexpr std::size_t MAX_NUM_SEGMENTS = 4; + +// V2 preamble consists of one or more preamble blocks depending on +// the number of segments a particular frame needs. Each block holds +// up to MAX_NUM_SEGMENTS segments and has its own CRC. +// +// XXX: currently the multi-segment facility is NOT implemented. +struct preamble_block_t { + // Tag. For multi-segmented frames the value is the same + // between subsequent preamble blocks. + __u8 tag; + + // Number of segments to go in entire frame. First preable block has + // set this to just #segments, second #segments - MAX_NUM_SEGMENTS, + // third to #segments - MAX_NUM_SEGMENTS and so on. + __u8 num_segments; + + std::array segments; + __u8 _reserved[2]; + + // CRC32 for this single preamble block. + __le32 crc; +} __attribute__((packed)); +static_assert(sizeof(preamble_block_t) % CRYPTO_BLOCK_SIZE == 0); +static_assert(std::is_standard_layout::value); + + +static constexpr uint32_t FRAME_PREAMBLE_SIZE = sizeof(preamble_block_t); + +template +struct Frame { +protected: + ceph::bufferlist payload; + ceph::bufferlist::contiguous_filler preamble_filler; + + void fill_preamble(const std::initializer_list main_segments) { + ceph_assert(std::size(main_segments) <= MAX_NUM_SEGMENTS); + + // Craft the main preamble. It's always present regardless of the number + // of segments message is composed from. This doesn't apply to extra one + // as it's optional -- if there is up to 2 segments, we'll never transmit + // preamble_extra_t; + { + preamble_block_t main_preamble; + // TODO: we might fill/pad with pseudo-random data. + ::memset(&main_preamble, 0, sizeof(main_preamble)); + + main_preamble.num_segments = std::size(main_segments); + main_preamble.tag = static_cast<__u8>(T::tag); + ceph_assert(main_preamble.tag != 0); + + std::copy(std::cbegin(main_segments), std::cend(main_segments), + std::begin(main_preamble.segments)); + + main_preamble.crc = + ceph_crc32c(0, reinterpret_cast(&main_preamble), + sizeof(main_preamble) - sizeof(main_preamble.crc)); + + preamble_filler.copy_in(sizeof(main_preamble), + reinterpret_cast(&main_preamble)); + } + } + +public: + Frame() : preamble_filler(payload.append_hole(FRAME_PREAMBLE_SIZE)) {} + + ceph::bufferlist &get_buffer() { + fill_preamble({segment_t{payload.length() - FRAME_PREAMBLE_SIZE, + segment_t::DEFAULT_ALIGNMENT}}); + return payload; + } + + void decode_frame(const ceph::bufferlist &bl) { + auto ti = bl.cbegin(); + static_cast(this)->decode_payload(ti); + } + + void decode_payload(bufferlist::const_iterator &ti) {} +}; + +// TODO, FIXME: fix this altogether with the Frame hierarchy rework +struct do_not_encode_tag_t {}; +struct dummy_ctor_conflict_helper {}; + +template +struct PayloadFrame : public Frame { +protected: + // this tuple is only used when decoding values from a payload buffer + std::tuple _values; + + // FIXME: for now, we assume specific features for the purpoess of encoding + // the frames themselves (*not* messages in message frames!). + uint64_t features = msgr2_frame_assumed; + + template + inline void _encode_payload_each(T &t) { + if constexpr (std::is_same()) { + this->payload.claim_append((bufferlist &)t); + } else if constexpr (std::is_same const>()) { + encode((uint32_t)t.size(), this->payload, features); + for (const auto &elem : t) { + encode(elem, this->payload, features); + } + } else if constexpr (std::is_same()) { + this->payload.append((char *)&t, sizeof(t)); + } else if constexpr (std::is_same()) { + /* NOP, only to discriminate ctors for decode/encode. FIXME. */ + } else { + encode(t, this->payload, features); + } + } + + template + inline void _decode_payload_each(T &t, bufferlist::const_iterator &ti) const { + if constexpr (std::is_same()) { + if (ti.get_remaining()) { + t.append(ti.get_current_ptr()); + } + } else if constexpr (std::is_same>()) { + uint32_t size; + decode(size, ti); + t.resize(size); + for (uint32_t i = 0; i < size; ++i) { + decode(t[i], ti); + } + } else if constexpr (std::is_same()) { + auto ptr = ti.get_current_ptr(); + ti.advance(sizeof(T)); + t = *(T *)ptr.raw_c_str(); + } else if constexpr (std::is_same()) { + /* NOP, only to discriminate ctors for decode/encode. FIXME. */ + } else { + decode(t, ti); + } + } + + template + inline void _decode_payload(bufferlist::const_iterator &ti, + std::index_sequence) const { + (_decode_payload_each((Args &)std::get(_values), ti), ...); + } + + template + inline decltype(auto) get_val() { + return std::get(_values); + } + +public: + PayloadFrame(const Args &... args) { (_encode_payload_each(args), ...); } + + PayloadFrame(do_not_encode_tag_t) {} + + PayloadFrame(const ceph::bufferlist &payload) { this->decode_frame(payload); } + + void decode_payload(bufferlist::const_iterator &ti) { + _decode_payload(ti, std::index_sequence_for()); + } +}; + +struct HelloFrame : public PayloadFrame { // peer address + static const Tag tag = Tag::HELLO; + using PayloadFrame::PayloadFrame; + + inline uint8_t &entity_type() { return get_val<0>(); } + inline entity_addr_t &peer_addr() { return get_val<1>(); } +}; + +struct AuthRequestFrame : public PayloadFrame, // preferred modes + bufferlist> { // auth payload + static const Tag tag = Tag::AUTH_REQUEST; + using PayloadFrame::PayloadFrame; + + inline uint32_t &method() { return get_val<0>(); } + inline vector &preferred_modes() { return get_val<1>(); } + inline bufferlist &auth_payload() { return get_val<2>(); } +}; + +struct AuthBadMethodFrame : public PayloadFrame, // allowed methods + std::vector> { // allowed modes + static const Tag tag = Tag::AUTH_BAD_METHOD; + using PayloadFrame::PayloadFrame; + + inline uint32_t &method() { return get_val<0>(); } + inline int32_t &result() { return get_val<1>(); } + inline std::vector &allowed_methods() { return get_val<2>(); } + inline std::vector &allowed_modes() { return get_val<3>(); } +}; + +struct AuthReplyMoreFrame : public PayloadFrame { // auth payload + static const Tag tag = Tag::AUTH_REPLY_MORE; + using PayloadFrame::PayloadFrame; + + inline bufferlist &auth_payload() { return get_val<1>(); } +}; + +struct AuthRequestMoreFrame : public PayloadFrame { // auth payload + static const Tag tag = Tag::AUTH_REQUEST_MORE; + using PayloadFrame::PayloadFrame; + + inline bufferlist &auth_payload() { return get_val<1>(); } +}; + +struct AuthDoneFrame : public PayloadFrame { // auth method payload + static const Tag tag = Tag::AUTH_DONE; + using PayloadFrame::PayloadFrame; + + inline uint64_t &global_id() { return get_val<0>(); } + inline uint32_t &con_mode() { return get_val<1>(); } + inline bufferlist &auth_payload() { return get_val<2>(); } +}; + +template +struct SignedEncryptedFrame : public PayloadFrame { + ceph::bufferlist &get_buffer() { + // In contrast to Frame::get_buffer() we don't fill preamble here. + return this->payload; + } + + SignedEncryptedFrame(ceph::crypto::onwire::rxtx_t &session_stream_handlers, + const Args &... args) + : PayloadFrame(args...) { + // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart + // from auth tag size + this->fill_preamble({segment_t{this->payload.length() - FRAME_PREAMBLE_SIZE, + segment_t::DEFAULT_ALIGNMENT}}); + + if (session_stream_handlers.tx) { + ceph_assert(session_stream_handlers.tx); + session_stream_handlers.tx->reset_tx_handler({this->payload.length()}); + + session_stream_handlers.tx->authenticated_encrypt_update( + std::move(this->payload)); + this->payload = session_stream_handlers.tx->authenticated_encrypt_final(); + } + } + + SignedEncryptedFrame(ceph::crypto::onwire::rxtx_t &session_stream_handlers, + ceph::bufferlist &bl) + : PayloadFrame(do_not_encode_tag_t{}) { + if (!session_stream_handlers.rx) { + this->decode_frame(bl); + return; + } + + const auto length = bl.length(); + ceph::bufferlist plain_bl = + session_stream_handlers.rx->authenticated_decrypt_update_final( + std::move(bl), segment_t::DEFAULT_ALIGNMENT); + ceph_assert(plain_bl.length() == + length - session_stream_handlers.rx->get_extra_size_at_final()); + this->decode_frame(plain_bl); + } +}; + +struct ClientIdentFrame + : public SignedEncryptedFrame { // client cookie + static const Tag tag = Tag::CLIENT_IDENT; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline entity_addrvec_t &addrs() { return get_val<0>(); } + inline entity_addr_t &target_addr() { return get_val<1>(); } + inline int64_t &gid() { return get_val<2>(); } + inline uint64_t &global_seq() { return get_val<3>(); } + inline uint64_t &supported_features() { return get_val<4>(); } + inline uint64_t &required_features() { return get_val<5>(); } + inline uint64_t &flags() { return get_val<6>(); } + inline uint64_t &cookie() { return get_val<7>(); } +}; + +struct ServerIdentFrame + : public SignedEncryptedFrame { // server cookie + static const Tag tag = Tag::SERVER_IDENT; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline entity_addrvec_t &addrs() { return get_val<0>(); } + inline int64_t &gid() { return get_val<1>(); } + inline uint64_t &global_seq() { return get_val<2>(); } + inline uint64_t &supported_features() { return get_val<3>(); } + inline uint64_t &required_features() { return get_val<4>(); } + inline uint64_t &flags() { return get_val<5>(); } + inline uint64_t &cookie() { return get_val<6>(); } +}; + +struct ReconnectFrame + : public SignedEncryptedFrame { // message sequence + static const Tag tag = Tag::SESSION_RECONNECT; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline entity_addrvec_t &addrs() { return get_val<0>(); } + inline uint64_t &client_cookie() { return get_val<1>(); } + inline uint64_t &server_cookie() { return get_val<2>(); } + inline uint64_t &global_seq() { return get_val<3>(); } + inline uint64_t &connect_seq() { return get_val<4>(); } + inline uint64_t &msg_seq() { return get_val<5>(); } +}; + +struct ResetFrame : public SignedEncryptedFrame { // full reset + static const Tag tag = Tag::SESSION_RESET; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline bool &full() { return get_val<0>(); } +}; + +struct RetryFrame : public SignedEncryptedFrame { // connection seq + static const Tag tag = Tag::SESSION_RETRY; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &connect_seq() { return get_val<0>(); } +}; + +struct RetryGlobalFrame : public SignedEncryptedFrame { // global seq + static const Tag tag = Tag::SESSION_RETRY_GLOBAL; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &global_seq() { return get_val<0>(); } +}; + +struct WaitFrame : public SignedEncryptedFrame { + static const Tag tag = Tag::WAIT; + using SignedEncryptedFrame::SignedEncryptedFrame; +}; + +struct ReconnectOkFrame : public SignedEncryptedFrame { // message seq + static const Tag tag = Tag::SESSION_RECONNECT_OK; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &msg_seq() { return get_val<0>(); } +}; + +struct IdentMissingFeaturesFrame + : public SignedEncryptedFrame { // missing features mask + static const Tag tag = Tag::IDENT_MISSING_FEATURES; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &features() { return get_val<0>(); } +}; + +struct KeepAliveFrame : public SignedEncryptedFrame { // timestamp + static const Tag tag = Tag::KEEPALIVE2; + using SignedEncryptedFrame::SignedEncryptedFrame; + + KeepAliveFrame(ceph::crypto::onwire::rxtx_t &session_stream_handlers) + : KeepAliveFrame(session_stream_handlers, ceph_clock_now()) {} + + inline utime_t ×tamp() { return get_val<0>(); } +}; + +struct KeepAliveFrameAck : public SignedEncryptedFrame { // ack timestamp + static const Tag tag = Tag::KEEPALIVE2_ACK; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline utime_t ×tamp() { return get_val<0>(); } +}; + +struct AckFrame : public SignedEncryptedFrame { // message sequence + static const Tag tag = Tag::ACK; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &seq() { return get_val<0>(); } +}; + +// This class is used for encoding/decoding header of the message frame. +// Body is processed almost independently with the sole junction point +// being the `extra_payload_len` passed to get_buffer(). +struct MessageHeaderFrame + : public PayloadFrame { + static const Tag tag = Tag::MESSAGE; + + ceph::bufferlist &get_buffer() { + // In contrast to Frame::get_buffer() we don't fill preamble here. + return this->payload; + } + + MessageHeaderFrame(const ceph_msg_header2 &msghdr, + const uint32_t front_len, + const uint32_t middle_len, + const uint32_t data_len) + : PayloadFrame(msghdr) + { + // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size + fill_preamble({ + segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE, + segment_t::DEFAULT_ALIGNMENT }, + segment_t{ front_len, segment_t::DEFAULT_ALIGNMENT }, + segment_t{ middle_len, segment_t::DEFAULT_ALIGNMENT }, + segment_t{ data_len, segment_t::DEFERRED_ALLOCATION }, + }); + } + + MessageHeaderFrame(ceph::bufferlist&& text) + : PayloadFrame(do_not_encode_tag_t{}) + { + this->decode_frame(text); + } + + inline ceph_msg_header2 &header() { return get_val<0>(); } +}; + +} // namespace ceph::msgr::v2 + +#endif // _MSG_ASYNC_FRAMES_V2_ -- 2.39.5