]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: hide seastar packet and temporary_buffer inside Socket
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 21 Apr 2023 03:47:36 +0000 (11:47 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:30 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 4dab5cf87ae3561fd67d2538b33eb98b6b7d43de)

src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/test/crimson/test_socket.cc

index e574ba2b2d500562cacbb163705392205b4f7bf8..165ae18a1a0b57bcc451836d47d0dd535939f990 100644 (file)
@@ -167,15 +167,15 @@ seastar::future<> FrameAssemblerV2::close_shutdown_socket()
   return socket->close();
 }
 
-seastar::future<Socket::tmp_buf>
+seastar::future<ceph::bufferptr>
 FrameAssemblerV2::read_exactly(std::size_t bytes)
 {
   assert(has_socket());
   if (unlikely(record_io)) {
     return socket->read_exactly(bytes
-    ).then([this](auto bl) {
-      rxbuf.append(buffer::create(bl.share()));
-      return bl;
+    ).then([this](auto bptr) {
+      rxbuf.append(bptr);
+      return bptr;
     });
   } else {
     return socket->read_exactly(bytes);
@@ -198,7 +198,7 @@ FrameAssemblerV2::read(std::size_t bytes)
 }
 
 seastar::future<>
-FrameAssemblerV2::write(ceph::bufferlist &&buf)
+FrameAssemblerV2::write(ceph::bufferlist buf)
 {
   assert(has_socket());
   if (unlikely(record_io)) {
@@ -215,7 +215,7 @@ FrameAssemblerV2::flush()
 }
 
 seastar::future<>
-FrameAssemblerV2::write_flush(ceph::bufferlist &&buf)
+FrameAssemblerV2::write_flush(ceph::bufferlist buf)
 {
   assert(has_socket());
   if (unlikely(record_io)) {
@@ -229,9 +229,9 @@ FrameAssemblerV2::read_main_preamble()
 {
   rx_preamble.clear();
   return read_exactly(rx_frame_asm.get_preamble_onwire_len()
-  ).then([this](auto bl) {
+  ).then([this](auto bptr) {
     try {
-      rx_preamble.append(buffer::create(std::move(bl)));
+      rx_preamble.append(std::move(bptr));
       const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
 #ifdef UNIT_TESTS_BUILT
       intercept_frame(tag, false);
@@ -263,22 +263,22 @@ FrameAssemblerV2::read_frame_payload()
       uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
       // TODO: create aligned and contiguous buffer from socket
       return read_exactly(onwire_len
-      ).then([this](auto tmp_bl) {
+      ).then([this](auto bptr) {
         logger().trace("{} RECV({}) frame segment[{}]",
-                       conn, tmp_bl.size(), rx_segments_data.size());
+                       conn, bptr.length(), rx_segments_data.size());
         bufferlist segment;
-        segment.append(buffer::create(std::move(tmp_bl)));
+        segment.append(std::move(bptr));
         rx_segments_data.emplace_back(std::move(segment));
       });
     }
   ).then([this] {
     return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
-  }).then([this](auto bl) {
-    logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
+  }).then([this](auto bptr) {
+    logger().trace("{} RECV({}) frame epilogue", conn, bptr.length());
     bool ok = false;
     try {
       bufferlist rx_epilogue;
-      rx_epilogue.append(buffer::create(std::move(bl)));
+      rx_epilogue.append(std::move(bptr));
       ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
     } catch (FrameError& e) {
       logger().error("read_frame_payload: {} {}", conn, e.what());
index 8f0884ebad700c0c1f4ea6170b7d6940f5fdcfbb..a99b5fce14b5086058ca3ac67bbf90979fcaf45c 100644 (file)
@@ -80,15 +80,15 @@ public:
    * socket read and write interfaces
    */
 
-  seastar::future<Socket::tmp_buf> read_exactly(std::size_t bytes);
+  seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes);
 
   seastar::future<ceph::bufferlist> read(std::size_t bytes);
 
-  seastar::future<> write(ceph::bufferlist &&);
+  seastar::future<> write(ceph::bufferlist);
 
   seastar::future<> flush();
 
-  seastar::future<> write_flush(ceph::bufferlist &&);
+  seastar::future<> write_flush(ceph::bufferlist);
 
   /*
    * frame read and write interfaces
index 92bb150e800958a89bce96c37ac7442adfe5a5f7..543f2581b476d0d5b5210ca754190316a8d3d865 100644 (file)
@@ -366,27 +366,30 @@ ProtocolV2::banner_exchange(bool is_connect)
       // 2. read peer banner
       unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
-      return frame_assembler->read_exactly(banner_len); // or read exactly?
-    }).then([this] (auto bl) {
+      return frame_assembler->read_exactly(banner_len);
+    }).then([this](auto bptr) {
       // 3. process peer banner and read banner_payload
       unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
       logger().debug("{} RECV({}) banner: \"{}\"",
-                     conn, bl.size(),
-                     std::string((const char*)bl.get(), banner_prefix_len));
+                     conn, bptr.length(),
+                     std::string(bptr.c_str(), banner_prefix_len));
 
-      if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
-        if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+      if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+        if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
           logger().warn("{} peer is using V1 protocol", conn);
         } else {
           logger().warn("{} peer sent bad banner", conn);
         }
         abort_in_fault();
       }
-      bl.trim_front(banner_prefix_len);
+
+      bptr.set_offset(bptr.offset() + banner_prefix_len);
+      bptr.set_length(bptr.length() - banner_prefix_len);
+      assert(bptr.length() == sizeof(ceph_le16));
 
       uint16_t payload_len;
       bufferlist buf;
-      buf.append(buffer::create(std::move(bl)));
+      buf.append(std::move(bptr));
       auto ti = buf.cbegin();
       try {
         decode(payload_len, ti);
@@ -1886,7 +1889,7 @@ void ProtocolV2::execute_server_wait()
   trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
   gated_execute("execute_server_wait", conn, [this] {
     return frame_assembler->read_exactly(1
-    ).then([this](auto bl) {
+    ).then([this](auto bptr) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
       abort_in_fault();
     }).handle_exception([this](std::exception_ptr eptr) {
index a28211911c865b5503c4b10511e1355ec87bb8f1..342053a3615dbb5dc5ee388c7a55f4bea677b5e9 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <seastar/core/sleep.hh>
 #include <seastar/core/when_all.hh>
+#include <seastar/net/packet.hh>
 
 #include "crimson/common/log.h"
 #include "Errors.h"
@@ -19,8 +20,8 @@ seastar::logger& logger() {
   return crimson::get_logger(ceph_subsys_ms);
 }
 
-using tmp_buf = Socket::tmp_buf;
-using packet = Socket::packet;
+using tmp_buf = seastar::temporary_buffer<char>;
+using packet = seastar::net::packet;
 
 // an input_stream consumer that reads buffer segments into a bufferlist up to
 // the given number of remaining bytes
@@ -141,36 +142,37 @@ Socket::read(size_t bytes)
 #endif
 }
 
-seastar::future<seastar::temporary_buffer<char>>
+seastar::future<bufferptr>
 Socket::read_exactly(size_t bytes) {
 #ifdef UNIT_TESTS_BUILT
   return try_trap_pre(next_trap_read).then([bytes, this] {
 #endif
     if (bytes == 0) {
-      return seastar::make_ready_future<seastar::temporary_buffer<char>>();
+      return seastar::make_ready_future<bufferptr>();
     }
     return in.read_exactly(bytes).then([bytes](auto buf) {
-      if (buf.size() < bytes) {
+      bufferptr ptr(buffer::create(buf.share()));
+      if (ptr.length() < bytes) {
         throw std::system_error(make_error_code(error::read_eof));
       }
       inject_failure();
       return inject_delay(
-      ).then([buf = std::move(buf)]() mutable {
-        return seastar::make_ready_future<tmp_buf>(std::move(buf));
+      ).then([ptr = std::move(ptr)]() mutable {
+        return seastar::make_ready_future<bufferptr>(std::move(ptr));
       });
     });
 #ifdef UNIT_TESTS_BUILT
-  }).then([this](auto buf) {
+  }).then([this](auto ptr) {
     return try_trap_post(next_trap_read
-    ).then([buf = std::move(buf)]() mutable {
-      return std::move(buf);
+    ).then([ptr = std::move(ptr)]() mutable {
+      return std::move(ptr);
     });
   });
 #endif
 }
 
 seastar::future<>
-Socket::write(packet &&buf)
+Socket::write(bufferlist buf)
 {
 #ifdef UNIT_TESTS_BUILT
   return try_trap_pre(next_trap_write
@@ -179,7 +181,8 @@ Socket::write(packet &&buf)
     inject_failure();
     return inject_delay(
     ).then([buf = std::move(buf), this]() mutable {
-      return out.write(std::move(buf));
+      packet p(std::move(buf));
+      return out.write(std::move(p));
     });
 #ifdef UNIT_TESTS_BUILT
   }).then([this] {
@@ -198,7 +201,7 @@ Socket::flush()
 }
 
 seastar::future<>
-Socket::write_flush(packet &&buf)
+Socket::write_flush(bufferlist buf)
 {
 #ifdef UNIT_TESTS_BUILT
   return try_trap_pre(next_trap_write
@@ -207,7 +210,8 @@ Socket::write_flush(packet &&buf)
     inject_failure();
     return inject_delay(
     ).then([buf = std::move(buf), this]() mutable {
-      return out.write(std::move(buf)
+      packet p(std::move(buf));
+      return out.write(std::move(p)
       ).then([this] {
         return out.flush();
       });
index beb66c08bfb56b026f56f207fc964378e3181c19..58a4484aa87c1252be4507f73610e0577f38431c 100644 (file)
@@ -6,7 +6,6 @@
 #include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sharded.hh>
-#include <seastar/net/packet.hh>
 
 #include "include/buffer.h"
 
@@ -67,15 +66,13 @@ public:
   /// read the requested number of bytes into a bufferlist
   seastar::future<bufferlist> read(size_t bytes);
 
-  using tmp_buf = seastar::temporary_buffer<char>;
-  using packet = seastar::net::packet;
-  seastar::future<tmp_buf> read_exactly(size_t bytes);
+  seastar::future<bufferptr> read_exactly(size_t bytes);
 
-  seastar::future<> write(packet &&buf);
+  seastar::future<> write(bufferlist);
 
   seastar::future<> flush();
 
-  seastar::future<> write_flush(packet &&buf);
+  seastar::future<> write_flush(bufferlist);
 
   // preemptively disable further reads or writes, can only be shutdown once.
   void shutdown();
index 423ae0cf2554dabe41747dc2c86fe474aa4415b6..4ca75c6e961911be6d21aae2e55fa6c36286d696 100644 (file)
@@ -291,8 +291,10 @@ class Connection {
         });
       } else {
         data[0] = write_count;
-        return socket->write(seastar::net::packet(
-            reinterpret_cast<const char*>(&data), sizeof(data))
+        bufferlist bl;
+        bl.append(buffer::copy(
+          reinterpret_cast<const char*>(&data), sizeof(data)));
+        return socket->write(bl
         ).then([this] {
           return socket->flush();
         }).then([this] {
@@ -348,9 +350,9 @@ class Connection {
             });
           } else {
             return socket->read_exactly(DATA_SIZE * sizeof(uint64_t)
-            ).then([this](auto buf) {
+            ).then([this](auto bptr) {
               uint64_t read_data[DATA_SIZE];
-              std::memcpy(read_data, buf.get(), DATA_SIZE * sizeof(uint64_t));
+              std::memcpy(read_data, bptr.c_str(), DATA_SIZE * sizeof(uint64_t));
               verify_data_read(read_data);
             });
           }