From a4bd568a5eb87d1167174637ebf3d72c4b18bb57 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 8 Mar 2019 22:21:16 +0100 Subject: [PATCH] msg/async, v2: rework decoding of MessageFrame. Signed-off-by: Radoslaw Zarzynski --- src/msg/async/ProtocolV2.cc | 53 ++++++++++++++----------------------- src/msg/async/frames_v2.h | 35 ++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 35d34fbe052..48fcca695c2 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -1291,40 +1291,32 @@ CtPtr ProtocolV2::handle_message() { #endif recv_stamp = ceph_clock_now(); - auto header_frame = MessageFrame::Decode( - std::move(rx_segments_data[SegmentIndex::Msg::HEADER])); - // XXX: paranoid copy just to avoid oops - ceph_msg_header2 current_header = header_frame.header(); - - ldout(cct, 20) << __func__ - << " got envelope type=" << current_header.type - << " src " << peer_name - << " off " << current_header.data_off - << dendl; - - INTERCEPT(16); + // we need to get the size before std::moving segments data + const size_t cur_msg_size = get_current_msg_size(); + auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data)); - const auto front_len = \ - rx_segments_desc[SegmentIndex::Msg::FRONT].length; - const auto middle_len = \ - rx_segments_desc[SegmentIndex::Msg::MIDDLE].length; - const auto data_len = \ - rx_segments_desc[SegmentIndex::Msg::DATA].length; + // XXX: paranoid copy just to avoid oops + ceph_msg_header2 current_header = msg_frame.header(); ldout(cct, 5) << __func__ - << " got " << front_len - << " + " << middle_len - << " + " << data_len - << " byte message" << dendl; + << " got " << msg_frame.front_len() + << " + " << msg_frame.middle_len() + << " + " << msg_frame.data_len() + << " byte message." + << " envelope type=" << current_header.type + << " src " << peer_name + << " off " << current_header.data_off + << dendl; + INTERCEPT(16); ceph_msg_header header{current_header.seq, current_header.tid, current_header.type, current_header.priority, current_header.version, - front_len, - middle_len, - data_len, + msg_frame.front_len(), + msg_frame.middle_len(), + msg_frame.data_len(), current_header.data_off, peer_name, current_header.compat_version, @@ -1333,9 +1325,9 @@ CtPtr ProtocolV2::handle_message() { ceph_msg_footer footer{0, 0, 0, 0, current_header.flags}; Message *message = decode_message(cct, 0, header, footer, - rx_segments_data[SegmentIndex::Msg::FRONT], - rx_segments_data[SegmentIndex::Msg::MIDDLE], - rx_segments_data[SegmentIndex::Msg::DATA], + msg_frame.front(), + msg_frame.middle(), + msg_frame.data(), connection); if (!message) { ldout(cct, 1) << __func__ << " decode message failed " << dendl; @@ -1349,11 +1341,6 @@ CtPtr ProtocolV2::handle_message() { message->set_byte_throttler(connection->policy.throttler_bytes); message->set_message_throttler(connection->policy.throttler_messages); - const uint32_t cur_msg_size = \ - rx_segments_desc[SegmentIndex::Msg::FRONT].length + \ - rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \ - rx_segments_desc[SegmentIndex::Msg::DATA].length; - // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. message->set_dispatch_throttle_size(cur_msg_size); diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index 2ae31f8b777..e979743dc24 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -688,9 +688,16 @@ struct MessageFrame : public Frame; + static MessageFrame Decode(rx_segments_t &&recv_segments) { MessageFrame f; - f.segments[SegmentIndex::Msg::HEADER] = std::move(msg_header); + // transfer segments' bufferlists. If a MessageFrame contains less + // SegmentsNumV segments, the missing ones will be seen as zeroed. + for (__u8 idx = 0; idx < std::size(recv_segments); idx++) { + f.segments[idx] = std::move(recv_segments[idx]); + } return f; } @@ -699,6 +706,30 @@ struct MessageFrame : public Frame(*hdrbl.c_str()); } + ceph::bufferlist &front() { + return segments[SegmentIndex::Msg::FRONT]; + } + + ceph::bufferlist &middle() { + return segments[SegmentIndex::Msg::MIDDLE]; + } + + ceph::bufferlist &data() { + return segments[SegmentIndex::Msg::DATA]; + } + + uint32_t front_len() const { + return segments[SegmentIndex::Msg::FRONT].length(); + } + + uint32_t middle_len() const { + return segments[SegmentIndex::Msg::MIDDLE].length(); + } + + uint32_t data_len() const { + return segments[SegmentIndex::Msg::DATA].length(); + } + protected: using Frame::Frame; }; -- 2.39.5