From 7b5f8b945f80e81ea1812b5824a657d73f94e5ae Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 8 Mar 2019 13:13:04 +0100 Subject: [PATCH] msg/async, v2: switch to rx_buffer_t entirely. Signed-off-by: Radoslaw Zarzynski --- src/msg/async/ProtocolV2.cc | 45 ++++++++++++------------------------- src/msg/async/ProtocolV2.h | 18 +++++++-------- 2 files changed, 22 insertions(+), 41 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 9d7ce00facd..aa3b80865b2 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -57,7 +57,7 @@ void ProtocolV2::run_continuation(CtRef continuation) { #define WRITE(B, D, C) write(D, CONTINUATION(C), B) -#define READ(L, C) read(CONTINUATION(C), L) +#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L))) #define READ_RXBUF(B, C) read(CONTINUATION(C), B) @@ -698,23 +698,6 @@ uint32_t ProtocolV2::get_epilogue_size() const { } } -CtPtr ProtocolV2::read(CONTINUATION_RX_TYPE &next, - int len, char *buffer) { - if (!buffer) { - buffer = temp_buffer; - } - [&next, this](char *buffer, int r) { - next.setParams(buffer, r); - run_continuation(next); - }); - if (r <= 0) { - next.setParams(buffer, r); - return &next; - } - - return nullptr; -} - CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE &next, rx_buffer_t &&buffer) { const auto len = buffer->length(); @@ -792,7 +775,7 @@ CtPtr ProtocolV2::_wait_for_peer_banner() { return READ(banner_len, _handle_peer_banner); } -CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) { +CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { @@ -803,8 +786,8 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) { unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); - if (memcmp(buffer, CEPH_BANNER_V2_PREFIX, banner_prefix_len)) { - if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { + if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) { + if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { lderr(cct) << __func__ << " peer " << *connection->peer_addrs << " is using msgr V1 protocol" << dendl; return _fault(); @@ -815,8 +798,9 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) { uint16_t payload_len; bufferlist bl; - bl.push_back( - buffer::create_static(sizeof(__le16), buffer + banner_prefix_len)); + buffer->set_offset(banner_prefix_len); + buffer->set_length(sizeof(__le16)); + bl.push_back(std::move(buffer)); auto ti = bl.cbegin(); try { decode(payload_len, ti); @@ -835,7 +819,7 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) { return READ(next_payload_len, _handle_peer_banner_payload); } -CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) { +CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { @@ -848,7 +832,7 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) { uint64_t peer_required_features; bufferlist bl; - bl.push_back(buffer::create_static(next_payload_len, buffer)); + bl.push_back(std::move(buffer)); auto ti = bl.cbegin(); try { decode(peer_supported_features, ti); @@ -965,7 +949,7 @@ CtPtr ProtocolV2::read_frame() { return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main); } -CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { +CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { @@ -975,7 +959,7 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { } ceph::bufferlist preamble; - preamble.push_back(buffer::create_static(FRAME_PREAMBLE_SIZE, buffer)); + preamble.push_back(std::move(buffer)); ldout(cct, 30) << __func__ << " preamble\n"; preamble.hexdump(*_dout); @@ -1238,7 +1222,7 @@ CtPtr ProtocolV2::ready() { return CONTINUE(read_frame); } -CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r) +CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; @@ -1257,8 +1241,7 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r) // decrypt epilogue and authenticate entire frame. ceph::bufferlist epilogue_bl; { - epilogue_bl.push_back(buffer::create_static(get_epilogue_size(), - buffer)); + epilogue_bl.push_back(std::move(buffer)); try { epilogue_bl = session_stream_handlers.rx->authenticated_decrypt_update_final( @@ -1273,7 +1256,7 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r) reinterpret_cast(*epilogue_bl.c_str()); late_flags = epilogue.late_flags; } else { - auto& epilogue = reinterpret_cast(*buffer); + auto& epilogue = reinterpret_cast(*buffer->c_str()); for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) { const __u32 expected_crc = epilogue.crc_values[idx]; diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 592cc2b1a8f..f3e42368a10 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -115,8 +115,6 @@ private: void run_continuation(Ct *pcontinuation); void run_continuation(Ct &continuation); - Ct *read(CONTINUATION_RX_TYPE &next, - int len, char *buffer = nullptr); Ct *read(CONTINUATION_RXBPTR_TYPE &next, rx_buffer_t&& buffer); template @@ -142,30 +140,30 @@ private: void handle_message_ack(uint64_t seq); CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload); + READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner); + READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload); Ct *_banner_exchange(Ct &callback); Ct *_wait_for_peer_banner(); - Ct *_handle_peer_banner(char *buffer, int r); - Ct *_handle_peer_banner_payload(char *buffer, int r); + Ct *_handle_peer_banner(rx_buffer_t &&buffer, int r); + Ct *_handle_peer_banner_payload(rx_buffer_t &&buffer, int r); Ct *handle_hello(ceph::bufferlist &payload); CONTINUATION_DECL(ProtocolV2, read_frame); CONTINUATION_DECL(ProtocolV2, finish_auth); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main); + READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main); READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_segment); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_epilogue_main); + READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_epilogue_main); CONTINUATION_DECL(ProtocolV2, throttle_message); CONTINUATION_DECL(ProtocolV2, throttle_bytes); CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue); Ct *read_frame(); Ct *finish_auth(); - Ct *handle_read_frame_preamble_main(char *buffer, int r); + Ct *handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r); Ct *read_frame_segment(); Ct *handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r); - Ct *handle_read_frame_epilogue_main(char *buffer, int r); + Ct *handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r); Ct *handle_read_frame_dispatch(); Ct *handle_frame_payload(); -- 2.39.5