From fe2608db4d1b838b3368c6e10941b201a6e06c5c Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 21 Apr 2023 11:47:36 +0800 Subject: [PATCH] crimson/net: hide seastar packet and temporary_buffer inside Socket Signed-off-by: Yingxin Cheng (cherry picked from commit 4dab5cf87ae3561fd67d2538b33eb98b6b7d43de) --- src/crimson/net/FrameAssemblerV2.cc | 28 ++++++++++++------------- src/crimson/net/FrameAssemblerV2.h | 6 +++--- src/crimson/net/ProtocolV2.cc | 21 +++++++++++-------- src/crimson/net/Socket.cc | 32 ++++++++++++++++------------- src/crimson/net/Socket.h | 9 +++----- src/test/crimson/test_socket.cc | 10 +++++---- 6 files changed, 56 insertions(+), 50 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index e574ba2b2d500..165ae18a1a0b5 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -167,15 +167,15 @@ seastar::future<> FrameAssemblerV2::close_shutdown_socket() return socket->close(); } -seastar::future +seastar::future FrameAssemblerV2::read_exactly(std::size_t bytes) { assert(has_socket()); if (unlikely(record_io)) { return socket->read_exactly(bytes - ).then([this](auto bl) { - rxbuf.append(buffer::create(bl.share())); - return bl; + ).then([this](auto bptr) { + rxbuf.append(bptr); + return bptr; }); } else { return socket->read_exactly(bytes); @@ -198,7 +198,7 @@ FrameAssemblerV2::read(std::size_t bytes) } seastar::future<> -FrameAssemblerV2::write(ceph::bufferlist &&buf) +FrameAssemblerV2::write(ceph::bufferlist buf) { assert(has_socket()); if (unlikely(record_io)) { @@ -215,7 +215,7 @@ FrameAssemblerV2::flush() } seastar::future<> -FrameAssemblerV2::write_flush(ceph::bufferlist &&buf) +FrameAssemblerV2::write_flush(ceph::bufferlist buf) { assert(has_socket()); if (unlikely(record_io)) { @@ -229,9 +229,9 @@ FrameAssemblerV2::read_main_preamble() { rx_preamble.clear(); return read_exactly(rx_frame_asm.get_preamble_onwire_len() - ).then([this](auto bl) { + ).then([this](auto bptr) { try { - rx_preamble.append(buffer::create(std::move(bl))); + rx_preamble.append(std::move(bptr)); const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble); #ifdef UNIT_TESTS_BUILT intercept_frame(tag, false); @@ -263,22 +263,22 @@ FrameAssemblerV2::read_frame_payload() uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); // TODO: create aligned and contiguous buffer from socket return read_exactly(onwire_len - ).then([this](auto tmp_bl) { + ).then([this](auto bptr) { logger().trace("{} RECV({}) frame segment[{}]", - conn, tmp_bl.size(), rx_segments_data.size()); + conn, bptr.length(), rx_segments_data.size()); bufferlist segment; - segment.append(buffer::create(std::move(tmp_bl))); + segment.append(std::move(bptr)); rx_segments_data.emplace_back(std::move(segment)); }); } ).then([this] { return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); - }).then([this](auto bl) { - logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); + }).then([this](auto bptr) { + logger().trace("{} RECV({}) frame epilogue", conn, bptr.length()); bool ok = false; try { bufferlist rx_epilogue; - rx_epilogue.append(buffer::create(std::move(bl))); + rx_epilogue.append(std::move(bptr)); ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue); } catch (FrameError& e) { logger().error("read_frame_payload: {} {}", conn, e.what()); diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index 8f0884ebad700..a99b5fce14b50 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -80,15 +80,15 @@ public: * socket read and write interfaces */ - seastar::future read_exactly(std::size_t bytes); + seastar::future read_exactly(std::size_t bytes); seastar::future read(std::size_t bytes); - seastar::future<> write(ceph::bufferlist &&); + seastar::future<> write(ceph::bufferlist); seastar::future<> flush(); - seastar::future<> write_flush(ceph::bufferlist &&); + seastar::future<> write_flush(ceph::bufferlist); /* * frame read and write interfaces diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 92bb150e80095..543f2581b476d 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -366,27 +366,30 @@ ProtocolV2::banner_exchange(bool is_connect) // 2. read peer banner unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16); INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ); - return frame_assembler->read_exactly(banner_len); // or read exactly? - }).then([this] (auto bl) { + return frame_assembler->read_exactly(banner_len); + }).then([this](auto bptr) { // 3. process peer banner and read banner_payload unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); logger().debug("{} RECV({}) banner: \"{}\"", - conn, bl.size(), - std::string((const char*)bl.get(), banner_prefix_len)); + conn, bptr.length(), + std::string(bptr.c_str(), banner_prefix_len)); - if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { - if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { + if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { + if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { logger().warn("{} peer is using V1 protocol", conn); } else { logger().warn("{} peer sent bad banner", conn); } abort_in_fault(); } - bl.trim_front(banner_prefix_len); + + bptr.set_offset(bptr.offset() + banner_prefix_len); + bptr.set_length(bptr.length() - banner_prefix_len); + assert(bptr.length() == sizeof(ceph_le16)); uint16_t payload_len; bufferlist buf; - buf.append(buffer::create(std::move(bl))); + buf.append(std::move(bptr)); auto ti = buf.cbegin(); try { decode(payload_len, ti); @@ -1886,7 +1889,7 @@ void ProtocolV2::execute_server_wait() trigger_state(state_t::SERVER_WAIT, io_state_t::none, false); gated_execute("execute_server_wait", conn, [this] { return frame_assembler->read_exactly(1 - ).then([this](auto bl) { + ).then([this](auto bptr) { logger().warn("{} SERVER_WAIT got read, abort", conn); abort_in_fault(); }).handle_exception([this](std::exception_ptr eptr) { diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index a28211911c865..342053a3615db 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -5,6 +5,7 @@ #include #include +#include #include "crimson/common/log.h" #include "Errors.h" @@ -19,8 +20,8 @@ seastar::logger& logger() { return crimson::get_logger(ceph_subsys_ms); } -using tmp_buf = Socket::tmp_buf; -using packet = Socket::packet; +using tmp_buf = seastar::temporary_buffer; +using packet = seastar::net::packet; // an input_stream consumer that reads buffer segments into a bufferlist up to // the given number of remaining bytes @@ -141,36 +142,37 @@ Socket::read(size_t bytes) #endif } -seastar::future> +seastar::future Socket::read_exactly(size_t bytes) { #ifdef UNIT_TESTS_BUILT return try_trap_pre(next_trap_read).then([bytes, this] { #endif if (bytes == 0) { - return seastar::make_ready_future>(); + return seastar::make_ready_future(); } return in.read_exactly(bytes).then([bytes](auto buf) { - if (buf.size() < bytes) { + bufferptr ptr(buffer::create(buf.share())); + if (ptr.length() < bytes) { throw std::system_error(make_error_code(error::read_eof)); } inject_failure(); return inject_delay( - ).then([buf = std::move(buf)]() mutable { - return seastar::make_ready_future(std::move(buf)); + ).then([ptr = std::move(ptr)]() mutable { + return seastar::make_ready_future(std::move(ptr)); }); }); #ifdef UNIT_TESTS_BUILT - }).then([this](auto buf) { + }).then([this](auto ptr) { return try_trap_post(next_trap_read - ).then([buf = std::move(buf)]() mutable { - return std::move(buf); + ).then([ptr = std::move(ptr)]() mutable { + return std::move(ptr); }); }); #endif } seastar::future<> -Socket::write(packet &&buf) +Socket::write(bufferlist buf) { #ifdef UNIT_TESTS_BUILT return try_trap_pre(next_trap_write @@ -179,7 +181,8 @@ Socket::write(packet &&buf) inject_failure(); return inject_delay( ).then([buf = std::move(buf), this]() mutable { - return out.write(std::move(buf)); + packet p(std::move(buf)); + return out.write(std::move(p)); }); #ifdef UNIT_TESTS_BUILT }).then([this] { @@ -198,7 +201,7 @@ Socket::flush() } seastar::future<> -Socket::write_flush(packet &&buf) +Socket::write_flush(bufferlist buf) { #ifdef UNIT_TESTS_BUILT return try_trap_pre(next_trap_write @@ -207,7 +210,8 @@ Socket::write_flush(packet &&buf) inject_failure(); return inject_delay( ).then([buf = std::move(buf), this]() mutable { - return out.write(std::move(buf) + packet p(std::move(buf)); + return out.write(std::move(p) ).then([this] { return out.flush(); }); diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index beb66c08bfb56..58a4484aa87c1 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -6,7 +6,6 @@ #include #include #include -#include #include "include/buffer.h" @@ -67,15 +66,13 @@ public: /// read the requested number of bytes into a bufferlist seastar::future read(size_t bytes); - using tmp_buf = seastar::temporary_buffer; - using packet = seastar::net::packet; - seastar::future read_exactly(size_t bytes); + seastar::future read_exactly(size_t bytes); - seastar::future<> write(packet &&buf); + seastar::future<> write(bufferlist); seastar::future<> flush(); - seastar::future<> write_flush(packet &&buf); + seastar::future<> write_flush(bufferlist); // preemptively disable further reads or writes, can only be shutdown once. void shutdown(); diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index 423ae0cf2554d..4ca75c6e96191 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -291,8 +291,10 @@ class Connection { }); } else { data[0] = write_count; - return socket->write(seastar::net::packet( - reinterpret_cast(&data), sizeof(data)) + bufferlist bl; + bl.append(buffer::copy( + reinterpret_cast(&data), sizeof(data))); + return socket->write(bl ).then([this] { return socket->flush(); }).then([this] { @@ -348,9 +350,9 @@ class Connection { }); } else { return socket->read_exactly(DATA_SIZE * sizeof(uint64_t) - ).then([this](auto buf) { + ).then([this](auto bptr) { uint64_t read_data[DATA_SIZE]; - std::memcpy(read_data, buf.get(), DATA_SIZE * sizeof(uint64_t)); + std::memcpy(read_data, bptr.c_str(), DATA_SIZE * sizeof(uint64_t)); verify_data_read(read_data); }); } -- 2.39.5