From: Radoslaw Zarzynski Date: Fri, 1 Mar 2019 17:33:25 +0000 (+0100) Subject: msg/async, v2: drop support for the buggy rx_buffers mechanism. X-Git-Tag: v14.1.1~70^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=dace9061000d9a8ec7951dd3ced52b99ee3f6f08;p=ceph-ci.git msg/async, v2: drop support for the buggy rx_buffers mechanism. See: * https://github.com/ceph/ceph/pull/26696, * http://tracker.ceph.com/issues/22480. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index ebf0aa308cc..20b3f8d151c 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -1103,8 +1103,7 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() { case Tag::WAIT: return handle_frame_payload(); case Tag::MESSAGE: - // see the comment in ::read_frame_segment(). - return handle_message_complete(); + return handle_message(); default: { lderr(cct) << __func__ << " received unknown tag=" << static_cast(next_tag) @@ -1122,35 +1121,6 @@ CtPtr ProtocolV2::read_frame_segment() { // description of current segment to read const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size()); - - if (cur_rx_desc.alignment == segment_t::DEFERRED_ALLOCATION) { - // This is a special case for the rx_buffers zero-copy optimization - // used for Message's data field. It might be dangerous and will be - // ultimately replaced by `allocation policies`. - rx_segments_data.emplace_back(ceph::bufferlist{}); - - // XXX: for the sake of unified epilogue handling this becomes even - // uglier. We are doing early dispatch of Messages now. The overall - // idea is to: - // 1. parse ceph_msg_header2 which let us know tid, and thus pick - // up appropriate rx_buffer (early dispatch aka pre-dispatch). - // 2. Read data field into selected rx_buffer. - // 3. REUNIFY WITH THE MAIN FLOW: read and handle frame epilogue. - // 4. Do ::handle_read_frame_dispatch() as for any kind of frame. - // For messages it wll call ::handle_message_complete(). -#if 0 - ceph_assert_always(next_tag == Tag::MESSAGE); -#else - if (next_tag != Tag::MESSAGE) { - ldout(cct, 20) << __func__ - << " only message can use DEFERRED_ALLOCATION" - << dendl; - return _fault(); - } -#endif - return handle_message(); - } - std::unique_ptr rx_buffer; try { rx_buffer = ceph::buffer::ptr_node::create(buffer::create_aligned( @@ -1285,93 +1255,6 @@ CtPtr ProtocolV2::ready() { return CONTINUE(read_frame); } -CtPtr ProtocolV2::handle_message() { - ldout(cct, 20) << __func__ << dendl; - - ceph_assert(rx_segments_data.size() == 4); - ceph_assert(state == THROTTLE_DONE); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ltt_recv_stamp = ceph_clock_now(); -#endif - recv_stamp = ceph_clock_now(); - - auto header_frame = MessageHeaderFrame::Decode( - std::move(rx_segments_data[SegmentIndex::Msg::HEADER])); - ceph_msg_header2 &header = header_frame.header(); - - ldout(cct, 20) << __func__ - << " got envelope type=" << header.type - << " src " << peer_name - << " off " << header.data_off - << dendl; - - INTERCEPT(16); - - // Reset state - data_buf.clear(); - - current_header = header; - return read_message_data_prepare(); -} - -CtPtr ProtocolV2::read_message_data_prepare() { - ldout(cct, 20) << __func__ << dendl; - - const auto data_len = \ - rx_segments_desc[SegmentIndex::Msg::DATA].length; - const unsigned data_off = le32_to_cpu(current_header.data_off); - - if (data_len) { - // get a buffer - map>::iterator p = - connection->rx_buffers.find(current_header.tid); - if (p != connection->rx_buffers.end()) { - ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second - << " at offset " << data_off << " len " - << p->second.first.length() << dendl; - data_buf = p->second.first; - // make sure it's big enough - if (data_buf.length() < data_len) - data_buf.push_back(buffer::create(data_len - data_buf.length())); - data_blp = data_buf.begin(); - } else { - ldout(cct, 20) << __func__ << " allocating new rx buffer at offset " - << data_off << dendl; - alloc_aligned_buffer(data_buf, data_len, data_off); - data_blp = data_buf.begin(); - } - } - - msg_left = data_len; - - return CONTINUE(read_message_data); -} - -CtPtr ProtocolV2::read_message_data() { - ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl; - - if (msg_left > 0) { - bufferptr bp = data_blp.get_current_ptr(); - unsigned read_len = std::min(bp.length(), msg_left); - - return READB(read_len, bp.c_str(), handle_message_data); - } - - // FIXME: if (auth_meta->is_mode_secure()) { - // ceph_assert(session_stream_handlers.rx); - if (session_stream_handlers.rx && \ - rx_segments_data[SegmentIndex::Msg::DATA].length()) { - rx_segments_data[SegmentIndex::Msg::DATA] = - session_stream_handlers.rx->authenticated_decrypt_update( - std::move(rx_segments_data[SegmentIndex::Msg::DATA]), - segment_t::DEFAULT_ALIGNMENT); - } - - state = READ_MESSAGE_COMPLETE; - return READ(FRAME_EPILOGUE_SIZE, handle_read_frame_epilogue_main); -} - CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; @@ -1404,6 +1287,7 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r) } catch (ceph::crypto::onwire::MsgAuthError &e) { ldout(cct, 5) << __func__ << " message authentication failed: " << e.what() << dendl; + ceph_assert("oops" == nullptr); return _fault(); } } else { @@ -1428,26 +1312,30 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r) return handle_read_frame_dispatch(); } -CtPtr ProtocolV2::handle_message_data(char *buffer, int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; +CtPtr ProtocolV2::handle_message() { + ldout(cct, 20) << __func__ << dendl; - if (r < 0) { - ldout(cct, 1) << __func__ << " read data error " << dendl; - return _fault(); - } + ceph_assert(rx_segments_data.size() == 4); + ceph_assert(state == THROTTLE_DONE); - bufferptr bp = data_blp.get_current_ptr(); - unsigned read_len = std::min(bp.length(), msg_left); - ceph_assert(read_len < std::numeric_limits::max()); - data_blp.advance(read_len); - rx_segments_data[SegmentIndex::Msg::DATA].append(bp, 0, read_len); - msg_left -= read_len; +#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) + ltt_recv_stamp = ceph_clock_now(); +#endif + recv_stamp = ceph_clock_now(); + state = READ_MESSAGE_COMPLETE; - return CONTINUE(read_message_data); -} + auto header_frame = MessageHeaderFrame::Decode( + std::move(rx_segments_data[SegmentIndex::Msg::HEADER])); + // XXX: paranoid copy just to avoid oops + ceph_msg_header2 current_header = header_frame.header(); -CtPtr ProtocolV2::handle_message_complete() { - ldout(cct, 20) << __func__ << dendl; + ldout(cct, 20) << __func__ + << " got envelope type=" << current_header.type + << " src " << peer_name + << " off " << current_header.data_off + << dendl; + + INTERCEPT(16); const auto front_len = \ rx_segments_desc[SegmentIndex::Msg::FRONT].length; @@ -1596,9 +1484,6 @@ CtPtr ProtocolV2::handle_message_complete() { handle_message_ack(current_header.ack_seq); - // clean up local buffer references - data_buf.clear(); - // we might have been reused by another connection // let's check if that is the case if (state != READY) { diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 3ba55fb595a..b6acd2be4db 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -94,13 +94,9 @@ private: ceph::msgr::v2::Tag sent_tag; ceph::msgr::v2::Tag next_tag; - ceph_msg_header2 current_header; utime_t backoff; // backoff time utime_t recv_stamp; utime_t throttle_stamp; - unsigned msg_left; - bufferlist data_buf; - bufferlist::iterator data_blp; bool keepalive; @@ -149,8 +145,6 @@ private: CONTINUATION_DECL(ProtocolV2, throttle_message); CONTINUATION_DECL(ProtocolV2, throttle_bytes); CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue); - CONTINUATION_DECL(ProtocolV2, read_message_data); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data); Ct *read_frame(); Ct *handle_read_frame_preamble_main(char *buffer, int r); @@ -167,9 +161,6 @@ private: Ct *throttle_bytes(); Ct *throttle_dispatch_queue(); Ct *read_message_data_prepare(); - Ct *read_message_data(); - Ct *handle_message_data(char *buffer, int r); - Ct *handle_message_complete(); Ct *handle_keepalive2(ceph::bufferlist &payload); Ct *handle_keepalive2_ack(ceph::bufferlist &payload); diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index 85f333f5d55..507dc471a8a 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -55,7 +55,7 @@ enum class Tag : __u8 { struct segment_t { // TODO: this will be dropped with support for `allocation policies`. // We need them because of the rx_buffers zero-copy optimization. - static constexpr __le16 DEFERRED_ALLOCATION{0x0000}; + static constexpr __le16 PAGE_SIZE_ALIGNMENT{4096}; static constexpr __le16 DEFAULT_ALIGNMENT = sizeof(void *); @@ -604,7 +604,7 @@ struct MessageHeaderFrame segment_t::DEFAULT_ALIGNMENT }, segment_t{ front.length(), segment_t::DEFAULT_ALIGNMENT }, segment_t{ middle.length(), segment_t::DEFAULT_ALIGNMENT }, - segment_t{ data.length(), segment_t::DEFERRED_ALLOCATION }, + segment_t{ data.length(), segment_t::PAGE_SIZE_ALIGNMENT }, }); // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size