From: Radoslaw Zarzynski Date: Tue, 19 Feb 2019 20:56:25 +0000 (+0100) Subject: msg/async, v2: decouple onwire segment length from logical length. X-Git-Tag: v14.1.1~157^2~10 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=813a8e1ba3d12e87733ee81a286746c9aba2e28a;p=ceph-ci.git msg/async, v2: decouple onwire segment length from logical length. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 4b657f49c0f..485f0ef68bc 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -706,9 +706,9 @@ void ProtocolV2::reset_recv_state() { connection->pendingReadLen.reset(); connection->writeCallback.reset(); - uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \ - rx_segments_todo_rev[1].length + \ - rx_segments_todo_rev[0].length; + uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \ + rx_segments_desc[2].logical.length + \ + rx_segments_desc[3].logical.length; if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE && connection->policy.throttler_messages) { @@ -1191,6 +1191,10 @@ bool ProtocolV2::is_queued() { return !out_queue.empty() || connection->is_queued(); } +uint32_t ProtocolV2::get_onwire_size(uint32_t logical_size) const { + return logical_size; +} + CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int), int len, char *buffer) { if (!buffer) { @@ -1440,6 +1444,8 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { } { + // I expect ceph_le32 will make the endian conversion for me. Passing + // everything through ::decode is unnecessary. const auto& main_preamble = \ reinterpret_cast(*preamble.c_str()); next_tag = static_cast(main_preamble.tag); @@ -1448,25 +1454,30 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { ceph_assert_always(main_preamble.num_segments == 1 || main_preamble.num_segments == 4); - // I expect ceph_le32 will make the endian conversion for me. Passing - // everything through ::decode is unnecessary. - rx_segments_todo_rev.clear(); + rx_segments_desc.clear(); + rx_segments_data.clear(); next_payload_len = 0; - for (std::uint8_t idx = main_preamble.num_segments; - idx <= rx_segments_todo_rev.capacity() && idx > 0; - /* NOP */) - { - --idx; + + if (main_preamble.num_segments > MAX_NUM_SEGMENTS) { + ldout(cct, 30) << __func__ + << " num_segments=" << main_preamble.num_segments + << " is too much" << dendl; + return _fault(); + } + for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) { ldout(cct, 10) << __func__ << " got new segment:" << " len=" << main_preamble.segments[idx].length << " align=" << main_preamble.segments[idx].alignment << dendl; - rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]); - next_payload_len += main_preamble.segments[idx].length; + rx_segments_desc.emplace_back(onwire_segment_t{ + get_onwire_size(main_preamble.segments[idx].length), + main_preamble.segments[idx] + }); + next_payload_len += rx_segments_desc.back().onwire_length; } if (session_stream_handlers.rx) { - rx_segments_todo_rev.front().length += \ + rx_segments_desc.back().onwire_length += \ session_stream_handlers.rx->get_extra_size_at_final(); next_payload_len += \ session_stream_handlers.rx->get_extra_size_at_final(); @@ -1489,7 +1500,6 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { state = THROTTLE_MESSAGE; return CONTINUE(throttle_message); } else { - rx_segments_data.clear(); return read_frame_segment(); } } @@ -1534,28 +1544,36 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() { CtPtr ProtocolV2::read_frame_segment() { ldout(cct, 20) << __func__ << dendl; - ceph_assert(!rx_segments_todo_rev.empty()); + ceph_assert(!rx_segments_desc.empty()); -#if 0 - auto rx_buffer = ceph::buffer::ptr_node::create( - buffer::create_aligned(rx_segments_todo_rev.back().length, - rx_segments_todo_rev.back().alignment)); -#else - auto rx_buffer = ceph::buffer::ptr_node::create( - buffer::create(rx_segments_todo_rev.back().length)); -#endif + // description of current segment to read + const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size()); - if (rx_segments_todo_rev.back().alignment == 0) { - rx_segments_data.emplace_back(); + if (cur_rx_desc.logical.alignment == segment_t::DEFERRED_ALLOCATION) { + // This is a special case for the rx_buffers zero-copy optimization + // used for Message's data field. It might be dangerous and will be + // ultimately replaced by `allocation policies`. + rx_segments_data.emplace_back(ceph::bufferlist{}); return handle_read_frame_dispatch(); } - rx_segments_todo_rev.pop_back(); + std::unique_ptr rx_buffer; + try { + rx_buffer = ceph::buffer::ptr_node::create(buffer::create_aligned( + cur_rx_desc.onwire_length, cur_rx_desc.logical.alignment)); + } catch (std::bad_alloc&) { + // Catching because of potential issues with satisfying alignment. + ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer " + << " len=" << cur_rx_desc.onwire_length + << " align=" << cur_rx_desc.logical.alignment + << dendl; + return _fault(); + } + rx_segments_data.emplace_back(); rx_segments_data.back().push_back(std::move(rx_buffer)); - return READB(rx_segments_data.back().length(), - rx_segments_data.back().c_str(), - handle_read_frame_segment); + return READB(rx_segments_data.back().length(), rx_segments_data.back().c_str(), + handle_read_frame_segment); } CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) { @@ -1567,11 +1585,12 @@ CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) { return _fault(); } - if (!rx_segments_todo_rev.empty()) { - return read_frame_segment(); + if (rx_segments_desc.size() == rx_segments_data.size()) { + // OK, all segments planned to read are read. Can go with dispatch. + return handle_read_frame_dispatch(); } else { // TODO: for makeshift only. This will be more generic and throttled - return handle_read_frame_dispatch(); + return read_frame_segment(); } } @@ -2028,10 +2047,10 @@ CtPtr ProtocolV2::throttle_message() { CtPtr ProtocolV2::throttle_bytes() { ldout(cct, 20) << __func__ << dendl; - ceph_assert(rx_segments_todo_rev.size() == 4); - uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \ - rx_segments_todo_rev[1].length + \ - rx_segments_todo_rev[0].length; + ceph_assert(rx_segments_desc.size() == 4); + uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \ + rx_segments_desc[2].logical.length + \ + rx_segments_desc[3].logical.length; if (cur_msg_size) { if (connection->policy.throttler_bytes) { ldout(cct, 10) << __func__ << " wants " << cur_msg_size @@ -2063,10 +2082,10 @@ CtPtr ProtocolV2::throttle_bytes() { CtPtr ProtocolV2::throttle_dispatch_queue() { ldout(cct, 20) << __func__ << dendl; - ceph_assert(rx_segments_todo_rev.size() == 4); - uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \ - rx_segments_todo_rev[1].length + \ - rx_segments_todo_rev[0].length; + ceph_assert(rx_segments_desc.size() == 4); + uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \ + rx_segments_desc[2].logical.length + \ + rx_segments_desc[3].logical.length; if (cur_msg_size) { if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( @@ -2091,7 +2110,6 @@ CtPtr ProtocolV2::throttle_dispatch_queue() { throttle_stamp = ceph_clock_now(); state = THROTTLE_DONE; - rx_segments_data.clear(); return read_frame_segment(); } diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 00b8e3bdd5e..8c8e4340df0 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -117,13 +117,21 @@ public: __le16 alignment; } __attribute__((packed)); + struct onwire_segment_t { + // crypto-processed segment can be expanded on-wire because of: + // * padding to achieve CRYPTO_BLOCK_SIZE alignment, + // * authentication tag. It's appended at the end of message. + // See RxHandler::get_extra_size_at_final(). + __le32 onwire_length; + + struct segment_t logical; + } __attribute__((packed)); + private: static constexpr std::size_t MAX_NUM_SEGMENTS = 4; - // segment descriptors are stored in reversed order. This is because - // vectors don't support ::pop_front. We might want to exchange - // the container to slightly tuned one in the future. - boost::container::static_vector rx_segments_todo_rev; + + boost::container::static_vector rx_segments_desc; boost::container::static_vector rx_segments_data; @@ -269,6 +277,8 @@ private: Ct *send_server_ident(); Ct *send_reconnect_ok(); Ct *server_ready(); + + uint32_t get_onwire_size(uint32_t logical_size) const; }; #endif /* _MSG_ASYNC_PROTOCOL_V2_ */