From 190051986439a84159758a46bd5c4436423b55ab Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 8 Mar 2019 19:40:37 +0100 Subject: [PATCH] msg/async, v2: limit the num_segments to non-empty segments. Signed-off-by: Radoslaw Zarzynski --- src/msg/async/ProtocolV2.cc | 42 +++++++++++++++---------------------- src/msg/async/ProtocolV2.h | 1 + src/msg/async/frames_v2.h | 24 ++++++++++++++++++++- 3 files changed, 41 insertions(+), 26 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 5c83ea24b9460..35d34fbe052da 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -235,6 +235,16 @@ void ProtocolV2::reset_recv_state() { reset_throttle(); } +size_t ProtocolV2::get_current_msg_size() const { + ceph_assert(!rx_segments_desc.empty()); + size_t sum = 0; + // we don't include SegmentIndex::Msg::HEADER. + for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) { + sum += rx_segments_desc[idx].length; + } + return sum; +} + void ProtocolV2::reset_throttle() { if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE && connection->policy.throttler_messages) { @@ -247,11 +257,7 @@ void ProtocolV2::reset_throttle() { } if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) { if (connection->policy.throttler_bytes) { - const uint32_t cur_msg_size = \ - rx_segments_desc[SegmentIndex::Msg::FRONT].length + \ - rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \ - rx_segments_desc[SegmentIndex::Msg::DATA].length; - + const size_t cur_msg_size = get_current_msg_size(); ldout(cct, 10) << __func__ << " releasing " << cur_msg_size << " bytes to policy throttler " << connection->policy.throttler_bytes->get_current() << "/" @@ -260,11 +266,7 @@ void ProtocolV2::reset_throttle() { } } if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) { - const uint32_t cur_msg_size = \ - rx_segments_desc[SegmentIndex::Msg::FRONT].length + \ - rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \ - rx_segments_desc[SegmentIndex::Msg::DATA].length; - + const size_t cur_msg_size = get_current_msg_size(); ldout(cct, 10) << __func__ << " releasing " << cur_msg_size << " bytes to dispatch_queue throttler " @@ -988,8 +990,9 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) { return _fault(); } - // currently we do support only 4 or 1 segments - if (main_preamble.num_segments != 1 && main_preamble.num_segments != 4) { + // currently we do support between 1 and MAX_NUM_SEGMENTS segments + if (main_preamble.num_segments < 1 || + main_preamble.num_segments > MAX_NUM_SEGMENTS) { ldout(cct, 10) << __func__ << " unsupported num_segments=" << " tx_crc=" << main_preamble.num_segments << dendl; return _fault(); @@ -1281,8 +1284,6 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r) CtPtr ProtocolV2::handle_message() { ldout(cct, 20) << __func__ << dendl; - - ceph_assert(rx_segments_data.size() == 4); ceph_assert(state == THROTTLE_DONE); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) @@ -1499,11 +1500,7 @@ CtPtr ProtocolV2::throttle_message() { CtPtr ProtocolV2::throttle_bytes() { ldout(cct, 20) << __func__ << dendl; - ceph_assert(rx_segments_desc.size() == 4); - uint32_t cur_msg_size = \ - rx_segments_desc[SegmentIndex::Msg::FRONT].length + \ - rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \ - rx_segments_desc[SegmentIndex::Msg::DATA].length; + const size_t cur_msg_size = get_current_msg_size(); if (cur_msg_size) { if (connection->policy.throttler_bytes) { ldout(cct, 10) << __func__ << " wants " << cur_msg_size @@ -1535,12 +1532,7 @@ CtPtr ProtocolV2::throttle_bytes() { CtPtr ProtocolV2::throttle_dispatch_queue() { ldout(cct, 20) << __func__ << dendl; - ceph_assert(rx_segments_desc.size() == 4); - uint32_t cur_msg_size = - rx_segments_desc[SegmentIndex::Msg::FRONT].length + \ - rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \ - rx_segments_desc[SegmentIndex::Msg::DATA].length; - + const size_t cur_msg_size = get_current_msg_size(); if (cur_msg_size) { if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( cur_msg_size)) { diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 96fdc4d570cbf..0193d7992a18e 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -243,6 +243,7 @@ private: uint32_t get_onwire_size(uint32_t logical_size) const; uint32_t get_epilogue_size() const; + size_t get_current_msg_size() const; }; #endif /* _MSG_ASYNC_PROTOCOL_V2_ */ diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index 2fade004b306d..2ae31f8b77756 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -176,6 +176,18 @@ private: }; ceph::bufferlist::contiguous_filler preamble_filler; + __u8 calc_num_segments( + const std::array& segments) + { + for (__u8 num = SegmentsNumV; num > 0; num--) { + if (segments[num-1].length) { + return num; + } + } + // frame always has at least one segment. + return 1; + } + // craft the main preamble. It's always present regardless of the number // of segments message is composed from. void fill_preamble() { @@ -194,13 +206,17 @@ private: segments.front().length() - FRAME_PREAMBLE_SIZE; main_preamble.segments.front().alignment = alignments.front(); + // there is no business in issuing frame without at least one segment + // filled. if constexpr(SegmentsNumV > 1) { for (__u8 idx = 1; idx < SegmentsNumV; idx++) { main_preamble.segments[idx].length = segments[idx].length(); main_preamble.segments[idx].alignment = alignments[idx]; } } - main_preamble.num_segments = SegmentsNumV; + // calculate the number of non-empty segments. + // TODO: reorder segments to get DATA first + main_preamble.num_segments = calc_num_segments(main_preamble.segments); main_preamble.crc = ceph_crc32c(0, reinterpret_cast(&main_preamble), @@ -649,6 +665,12 @@ struct MessageFrame : public Frame { + struct { + uint32_t front; + uint32_t middle; + uint32_t data; + } len; + static const Tag tag = Tag::MESSAGE; static MessageFrame Encode(const ceph_msg_header2 &msg_header, -- 2.39.5