case Tag::WAIT:
return handle_frame_payload();
case Tag::MESSAGE:
- // see the comment in ::read_frame_segment().
- return handle_message_complete();
+ return handle_message();
default: {
lderr(cct) << __func__
<< " received unknown tag=" << static_cast<uint32_t>(next_tag)
// description of current segment to read
const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
-
- if (cur_rx_desc.alignment == segment_t::DEFERRED_ALLOCATION) {
- // This is a special case for the rx_buffers zero-copy optimization
- // used for Message's data field. It might be dangerous and will be
- // ultimately replaced by `allocation policies`.
- rx_segments_data.emplace_back(ceph::bufferlist{});
-
- // XXX: for the sake of unified epilogue handling this becomes even
- // uglier. We are doing early dispatch of Messages now. The overall
- // idea is to:
- // 1. parse ceph_msg_header2 which let us know tid, and thus pick
- // up appropriate rx_buffer (early dispatch aka pre-dispatch).
- // 2. Read data field into selected rx_buffer.
- // 3. REUNIFY WITH THE MAIN FLOW: read and handle frame epilogue.
- // 4. Do ::handle_read_frame_dispatch() as for any kind of frame.
- // For messages it wll call ::handle_message_complete().
-#if 0
- ceph_assert_always(next_tag == Tag::MESSAGE);
-#else
- if (next_tag != Tag::MESSAGE) {
- ldout(cct, 20) << __func__
- << " only message can use DEFERRED_ALLOCATION"
- << dendl;
- return _fault();
- }
-#endif
- return handle_message();
- }
-
std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer> rx_buffer;
try {
rx_buffer = ceph::buffer::ptr_node::create(buffer::create_aligned(
return CONTINUE(read_frame);
}
-CtPtr ProtocolV2::handle_message() {
- ldout(cct, 20) << __func__ << dendl;
-
- ceph_assert(rx_segments_data.size() == 4);
- ceph_assert(state == THROTTLE_DONE);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
- ltt_recv_stamp = ceph_clock_now();
-#endif
- recv_stamp = ceph_clock_now();
-
- auto header_frame = MessageHeaderFrame::Decode(
- std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
- ceph_msg_header2 &header = header_frame.header();
-
- ldout(cct, 20) << __func__
- << " got envelope type=" << header.type
- << " src " << peer_name
- << " off " << header.data_off
- << dendl;
-
- INTERCEPT(16);
-
- // Reset state
- data_buf.clear();
-
- current_header = header;
- return read_message_data_prepare();
-}
-
-CtPtr ProtocolV2::read_message_data_prepare() {
- ldout(cct, 20) << __func__ << dendl;
-
- const auto data_len = \
- rx_segments_desc[SegmentIndex::Msg::DATA].length;
- const unsigned data_off = le32_to_cpu(current_header.data_off);
-
- if (data_len) {
- // get a buffer
- map<ceph_tid_t, pair<bufferlist, int>>::iterator p =
- connection->rx_buffers.find(current_header.tid);
- if (p != connection->rx_buffers.end()) {
- ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
- << " at offset " << data_off << " len "
- << p->second.first.length() << dendl;
- data_buf = p->second.first;
- // make sure it's big enough
- if (data_buf.length() < data_len)
- data_buf.push_back(buffer::create(data_len - data_buf.length()));
- data_blp = data_buf.begin();
- } else {
- ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
- << data_off << dendl;
- alloc_aligned_buffer(data_buf, data_len, data_off);
- data_blp = data_buf.begin();
- }
- }
-
- msg_left = data_len;
-
- return CONTINUE(read_message_data);
-}
-
-CtPtr ProtocolV2::read_message_data() {
- ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
-
- if (msg_left > 0) {
- bufferptr bp = data_blp.get_current_ptr();
- unsigned read_len = std::min(bp.length(), msg_left);
-
- return READB(read_len, bp.c_str(), handle_message_data);
- }
-
- // FIXME: if (auth_meta->is_mode_secure()) {
- // ceph_assert(session_stream_handlers.rx);
- if (session_stream_handlers.rx && \
- rx_segments_data[SegmentIndex::Msg::DATA].length()) {
- rx_segments_data[SegmentIndex::Msg::DATA] =
- session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(rx_segments_data[SegmentIndex::Msg::DATA]),
- segment_t::DEFAULT_ALIGNMENT);
- }
-
- state = READ_MESSAGE_COMPLETE;
- return READ(FRAME_EPILOGUE_SIZE, handle_read_frame_epilogue_main);
-}
-
CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r)
{
ldout(cct, 20) << __func__ << " r=" << r << dendl;
} catch (ceph::crypto::onwire::MsgAuthError &e) {
ldout(cct, 5) << __func__ << " message authentication failed: "
<< e.what() << dendl;
+ ceph_assert("oops" == nullptr);
return _fault();
}
} else {
return handle_read_frame_dispatch();
}
-CtPtr ProtocolV2::handle_message_data(char *buffer, int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
+CtPtr ProtocolV2::handle_message() {
+ ldout(cct, 20) << __func__ << dendl;
- if (r < 0) {
- ldout(cct, 1) << __func__ << " read data error " << dendl;
- return _fault();
- }
+ ceph_assert(rx_segments_data.size() == 4);
+ ceph_assert(state == THROTTLE_DONE);
- bufferptr bp = data_blp.get_current_ptr();
- unsigned read_len = std::min(bp.length(), msg_left);
- ceph_assert(read_len < std::numeric_limits<int>::max());
- data_blp.advance(read_len);
- rx_segments_data[SegmentIndex::Msg::DATA].append(bp, 0, read_len);
- msg_left -= read_len;
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+ ltt_recv_stamp = ceph_clock_now();
+#endif
+ recv_stamp = ceph_clock_now();
+ state = READ_MESSAGE_COMPLETE;
- return CONTINUE(read_message_data);
-}
+ auto header_frame = MessageHeaderFrame::Decode(
+ std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
+ // XXX: paranoid copy just to avoid oops
+ ceph_msg_header2 current_header = header_frame.header();
-CtPtr ProtocolV2::handle_message_complete() {
- ldout(cct, 20) << __func__ << dendl;
+ ldout(cct, 20) << __func__
+ << " got envelope type=" << current_header.type
+ << " src " << peer_name
+ << " off " << current_header.data_off
+ << dendl;
+
+ INTERCEPT(16);
const auto front_len = \
rx_segments_desc[SegmentIndex::Msg::FRONT].length;
handle_message_ack(current_header.ack_seq);
- // clean up local buffer references
- data_buf.clear();
-
// we might have been reused by another connection
// let's check if that is the case
if (state != READY) {