From: Radoslaw Zarzynski Date: Thu, 14 Feb 2019 00:39:26 +0000 (+0100) Subject: msg/async: V2 uses segments instead of next_payload_len, part 1. X-Git-Tag: v14.1.1~157^2~33 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=430abcfd4873f5bda0e4fa3bacb1714e85e0e2e3;p=ceph.git msg/async: V2 uses segments instead of next_payload_len, part 1. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index ade751432da4..e8fedf729ef3 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -118,10 +118,7 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { static constexpr uint8_t CRYPTO_BLOCK_SIZE { 16 }; -struct segment_t { - __le32 length; - __le16 alignment; -} __attribute__((packed)); +using segment_t = ProtocolV2::segment_t; struct preamble_main_t { static constexpr std::size_t MAX_NUM_SEGMENTS = 2; @@ -129,14 +126,14 @@ struct preamble_main_t { __le16 crc; __u8 tag; __u8 num_segments; - std::array segments; + std::array segments; } __attribute__((packed)); static_assert(sizeof(preamble_main_t) == CRYPTO_BLOCK_SIZE); static_assert(std::is_standard_layout::value); struct preamble_extra_t { static constexpr std::size_t MAX_NUM_SEGMENTS = 2; - std::array segments; + std::array segments; std::array<__u8, 4> always_padding; } __attribute__((packed)); static_assert(sizeof(preamble_extra_t) == CRYPTO_BLOCK_SIZE); @@ -1452,6 +1449,19 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { // everything through ::decode is unnecessary. next_payload_len = main_preamble.segments[0].length; + rx_segments_todo_rev.clear(); + for (std::uint8_t idx = main_preamble.num_segments; + idx <= rx_segments_todo_rev.capacity() && idx > 0; + /* NOP */) + { + --idx; + ldout(cct, 10) << __func__ << " got new segment:" + << " len=" << main_preamble.segments[idx].length + << " align=" << main_preamble.segments[idx].alignment + << dendl; + rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]); + } + // TODO: move this ugliness into dedicated procedure const auto rx_crc = ceph_crc16c(0, reinterpret_cast(&main_preamble) + sizeof(main_preamble.crc), @@ -1489,7 +1499,8 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() { case Tag::KEEPALIVE2: case Tag::KEEPALIVE2_ACK: case Tag::ACK: - return READ(next_payload_len, handle_frame_payload); + rx_segments_data.clear(); + return read_frame_segment(); case Tag::WAIT: return handle_wait(); case Tag::MESSAGE: @@ -1505,6 +1516,44 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() { return nullptr; } +CtPtr ProtocolV2::read_frame_segment() { + ldout(cct, 20) << __func__ << dendl; + ceph_assert(!rx_segments_todo_rev.empty()); + +#if 0 + auto rx_buffer = ceph::buffer::ptr_node::create( + buffer::create_aligned(rx_segments_todo_rev.back().length, + rx_segments_todo_rev.back().alignment)); +#else + auto rx_buffer = ceph::buffer::ptr_node::create( + buffer::create(rx_segments_todo_rev.back().length)); +#endif + + rx_segments_todo_rev.pop_back(); + rx_segments_data.emplace_back(); + rx_segments_data.back().push_back(std::move(rx_buffer)); + return READB(rx_segments_data.back().length(), + rx_segments_data.back().c_str(), + handle_frame_segment); +} + +CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " (" + << cpp_strerror(r) << ")" << dendl; + return _fault(); + } + + if (!rx_segments_todo_rev.empty()) { + return read_frame_segment(); + } else { + // TODO: for makeshift only. This will be more generic and throttled + return handle_frame_payload(buffer, r); + } +} + CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; @@ -1514,47 +1563,50 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { return _fault(); } + ceph_assert(!rx_segments_data.empty()); + const auto this_payload_len = rx_segments_data.back().length(); + ldout(cct, 30) << __func__ << "\n"; bufferlist bl; - bl.append(buffer, next_payload_len); + bl.append(buffer, this_payload_len); bl.hexdump(*_dout); *_dout << dendl; switch (next_tag) { case Tag::HELLO: - return handle_hello(buffer, next_payload_len); + return handle_hello(buffer, this_payload_len); case Tag::AUTH_REQUEST: - return handle_auth_request(buffer, next_payload_len); + return handle_auth_request(buffer, this_payload_len); case Tag::AUTH_BAD_METHOD: - return handle_auth_bad_method(buffer, next_payload_len); + return handle_auth_bad_method(buffer, this_payload_len); case Tag::AUTH_REPLY_MORE: - return handle_auth_reply_more(buffer, next_payload_len); + return handle_auth_reply_more(buffer, this_payload_len); case Tag::AUTH_REQUEST_MORE: - return handle_auth_request_more(buffer, next_payload_len); + return handle_auth_request_more(buffer, this_payload_len); case Tag::AUTH_DONE: - return handle_auth_done(buffer, next_payload_len); + return handle_auth_done(buffer, this_payload_len); case Tag::CLIENT_IDENT: - return handle_client_ident(buffer, next_payload_len); + return handle_client_ident(buffer, this_payload_len); case Tag::SERVER_IDENT: - return handle_server_ident(buffer, next_payload_len); + return handle_server_ident(buffer, this_payload_len); case Tag::IDENT_MISSING_FEATURES: - return handle_ident_missing_features(buffer, next_payload_len); + return handle_ident_missing_features(buffer, this_payload_len); case Tag::SESSION_RECONNECT: - return handle_reconnect(buffer, next_payload_len); + return handle_reconnect(buffer, this_payload_len); case Tag::SESSION_RESET: - return handle_session_reset(buffer, next_payload_len); + return handle_session_reset(buffer, this_payload_len); case Tag::SESSION_RETRY: - return handle_session_retry(buffer, next_payload_len); + return handle_session_retry(buffer, this_payload_len); case Tag::SESSION_RETRY_GLOBAL: - return handle_session_retry_global(buffer, next_payload_len); + return handle_session_retry_global(buffer, this_payload_len); case Tag::SESSION_RECONNECT_OK: - return handle_reconnect_ok(buffer, next_payload_len); + return handle_reconnect_ok(buffer, this_payload_len); case Tag::KEEPALIVE2: - return handle_keepalive2(buffer, next_payload_len); + return handle_keepalive2(buffer, this_payload_len); case Tag::KEEPALIVE2_ACK: - return handle_keepalive2_ack(buffer, next_payload_len); + return handle_keepalive2_ack(buffer, this_payload_len); case Tag::ACK: - return handle_message_ack(buffer, next_payload_len); + return handle_message_ack(buffer, this_payload_len); default: ceph_abort(); } diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 1514e199b60f..38c7cee04bdb 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -4,6 +4,8 @@ #ifndef _MSG_ASYNC_PROTOCOL_V2_ #define _MSG_ASYNC_PROTOCOL_V2_ +#include + #include "Protocol.h" #include "crypto_onwire.h" @@ -102,6 +104,24 @@ private: Ct *bannerExchangeCallback; uint32_t next_payload_len; + +public: + struct segment_t { + __le32 length; + __le16 alignment; + } __attribute__((packed)); + +private: + static constexpr std::size_t MAX_NUM_SEGMENTS = 4; + // segment descriptors are stored in reversed order. This is because + // vectors don't support ::pop_front. We might want to exchange + // the container to slightly tuned one in the future. + boost::container::static_vector rx_segments_todo_rev; + boost::container::static_vector rx_segments_data; + + Tag next_tag; ceph_msg_header2 current_header; utime_t backoff; // backoff time @@ -148,7 +168,7 @@ private: CONTINUATION_DECL(ProtocolV2, read_frame); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_segment); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header); CONTINUATION_DECL(ProtocolV2, throttle_message); CONTINUATION_DECL(ProtocolV2, throttle_bytes); @@ -162,6 +182,8 @@ private: Ct *read_frame(); Ct *handle_read_frame_preamble_main(char *buffer, int r); Ct *handle_read_frame_dispatch(); + Ct *read_frame_segment(); + Ct *handle_frame_segment(char *buffer, int r); Ct *handle_frame_payload(char *buffer, int r); Ct *ready();