#endif
recv_stamp = ceph_clock_now();
- auto header_frame = MessageFrame::Decode(
- std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
- // XXX: paranoid copy just to avoid oops
- ceph_msg_header2 current_header = header_frame.header();
-
- ldout(cct, 20) << __func__
- << " got envelope type=" << current_header.type
- << " src " << peer_name
- << " off " << current_header.data_off
- << dendl;
-
- INTERCEPT(16);
+ // we need to get the size before std::moving segments data
+ const size_t cur_msg_size = get_current_msg_size();
+ auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
- const auto front_len = \
- rx_segments_desc[SegmentIndex::Msg::FRONT].length;
- const auto middle_len = \
- rx_segments_desc[SegmentIndex::Msg::MIDDLE].length;
- const auto data_len = \
- rx_segments_desc[SegmentIndex::Msg::DATA].length;
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = msg_frame.header();
ldout(cct, 5) << __func__
- << " got " << front_len
- << " + " << middle_len
- << " + " << data_len
- << " byte message" << dendl;
+ << " got " << msg_frame.front_len()
+ << " + " << msg_frame.middle_len()
+ << " + " << msg_frame.data_len()
+ << " byte message."
+ << " envelope type=" << current_header.type
+ << " src " << peer_name
+ << " off " << current_header.data_off
+ << dendl;
+ INTERCEPT(16);
ceph_msg_header header{current_header.seq,
current_header.tid,
current_header.type,
current_header.priority,
current_header.version,
- front_len,
- middle_len,
- data_len,
+ msg_frame.front_len(),
+ msg_frame.middle_len(),
+ msg_frame.data_len(),
current_header.data_off,
peer_name,
current_header.compat_version,
ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
Message *message = decode_message(cct, 0, header, footer,
- rx_segments_data[SegmentIndex::Msg::FRONT],
- rx_segments_data[SegmentIndex::Msg::MIDDLE],
- rx_segments_data[SegmentIndex::Msg::DATA],
+ msg_frame.front(),
+ msg_frame.middle(),
+ msg_frame.data(),
connection);
if (!message) {
ldout(cct, 1) << __func__ << " decode message failed " << dendl;
message->set_byte_throttler(connection->policy.throttler_bytes);
message->set_message_throttler(connection->policy.throttler_messages);
- const uint32_t cur_msg_size = \
- rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
- rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
- rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
// store reservation size in message, so we don't get confused
// by messages entering the dispatch queue through other paths.
message->set_dispatch_throttle_size(cur_msg_size);
return f;
}
- static MessageFrame Decode(ceph::bufferlist &&msg_header) {
+ using rx_segments_t =
+ boost::container::static_vector<ceph::bufferlist,
+ ceph::msgr::v2::MAX_NUM_SEGMENTS>;
+ static MessageFrame Decode(rx_segments_t &&recv_segments) {
MessageFrame f;
- f.segments[SegmentIndex::Msg::HEADER] = std::move(msg_header);
+ // transfer segments' bufferlists. If a MessageFrame contains less
+ // SegmentsNumV segments, the missing ones will be seen as zeroed.
+ for (__u8 idx = 0; idx < std::size(recv_segments); idx++) {
+ f.segments[idx] = std::move(recv_segments[idx]);
+ }
return f;
}
return reinterpret_cast<const ceph_msg_header2&>(*hdrbl.c_str());
}
+ ceph::bufferlist &front() {
+ return segments[SegmentIndex::Msg::FRONT];
+ }
+
+ ceph::bufferlist &middle() {
+ return segments[SegmentIndex::Msg::MIDDLE];
+ }
+
+ ceph::bufferlist &data() {
+ return segments[SegmentIndex::Msg::DATA];
+ }
+
+ uint32_t front_len() const {
+ return segments[SegmentIndex::Msg::FRONT].length();
+ }
+
+ uint32_t middle_len() const {
+ return segments[SegmentIndex::Msg::MIDDLE].length();
+ }
+
+ uint32_t data_len() const {
+ return segments[SegmentIndex::Msg::DATA].length();
+ }
+
protected:
using Frame::Frame;
};