]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: move socket from Protocol to SocketConnection
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 31 Oct 2022 09:31:23 +0000 (17:31 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 8 Feb 2023 06:07:41 +0000 (14:07 +0800)
Protocol class will be removed.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 95878692f0293c2bb6f66ac7d16d0b3e532c6629..734af8eda5382239df3ddbcc6b44e48b2007cc10 100644 (file)
@@ -8,7 +8,6 @@
 #include "crimson/common/log.h"
 #include "crimson/net/Errors.h"
 #include "crimson/net/chained_dispatchers.h"
-#include "crimson/net/Socket.h"
 #include "crimson/net/SocketConnection.h"
 #include "msg/Message.h"
 
@@ -51,8 +50,8 @@ void Protocol::close(bool dispatch_reset,
   if (f_accept_new) {
     (*f_accept_new)();
   }
-  if (socket) {
-    socket->shutdown();
+  if (conn.socket) {
+    conn.socket->shutdown();
   }
   set_write_state(write_state_t::drop);
   assert(!gate.is_closed());
@@ -67,8 +66,8 @@ void Protocol::close(bool dispatch_reset,
   // asynchronous operations
   assert(!close_ready.valid());
   close_ready = std::move(gate_closed).then([this] {
-    if (socket) {
-      return socket->close();
+    if (conn.socket) {
+      return conn.socket->close();
     } else {
       return seastar::now();
     }
@@ -208,7 +207,7 @@ void Protocol::ack_writes(seq_num_t seq)
 
 seastar::future<stop_t> Protocol::try_exit_sweep() {
   assert(!is_queued());
-  return socket->flush().then([this] {
+  return conn.socket->flush().then([this] {
     if (!is_queued()) {
       // still nothing pending to send after flush,
       // the dispatching can ONLY stop now
@@ -242,7 +241,7 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
       auto acked = ack_left;
       assert(acked == 0 || conn.in_seq > 0);
       // sweep all pending writes with the concrete Protocol
-      return socket->write(sweep_messages_and_move_to_sent(
+      return conn.socket->write(sweep_messages_and_move_to_sent(
           num_msgs, need_keepalive, keepalive_ack, acked > 0)
       ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
         need_keepalive = false;
@@ -292,7 +291,7 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
                      conn, write_state, e);
       ceph_abort();
     }
-    socket->shutdown();
+    conn.socket->shutdown();
     if (write_state == write_state_t::open) {
       logger().info("{} write_event(): fault at {}, going to delay -- {}",
                     conn, write_state, e);
index 782b357b9df33bdc6036d217f3eba386b217f9da..f819c4692083a6b2ca6f0a98e8c8e7eebc94194b 100644 (file)
@@ -75,9 +75,6 @@ class Protocol {
       std::optional<utime_t> keepalive_ack,
       bool require_ack); 
 
- public:
-  SocketRef socket;
-
  protected:
   ChainedDispatchers& dispatchers;
   SocketConnection &conn;
index edcc7de594739b22ea52804e870a6308d6dc89b1..caa19da1e5f61c3c9d8240002b94fc106da3f7cc 100644 (file)
@@ -16,7 +16,6 @@
 
 #include "chained_dispatchers.h"
 #include "Errors.h"
-#include "Socket.h"
 #include "SocketConnection.h"
 #include "SocketMessenger.h"
 
@@ -121,11 +120,11 @@ void intercept(Breakpoint bp, bp_type_t type,
 }
 
 #define INTERCEPT_CUSTOM(bp, type)       \
-intercept({bp}, type, conn, socket)
+intercept({bp}, type, conn, conn.socket)
 
 #define INTERCEPT_FRAME(tag, type)       \
 intercept({static_cast<Tag>(tag), type}, \
-          type, conn, socket)
+          type, conn, conn.socket)
 
 #define INTERCEPT_N_RW(bp)                               \
 if (conn.interceptor) {                                  \
@@ -178,7 +177,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
                                const entity_name_t& _peer_name)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!socket);
+  ceph_assert(!conn.socket);
   ceph_assert(!gate.is_closed());
   conn.peer_addr = _peer_addr;
   conn.target_addr = _peer_addr;
@@ -199,10 +198,10 @@ void ProtocolV2::start_accept(SocketRef&& sock,
                               const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!socket);
+  ceph_assert(!conn.socket);
   // until we know better
   conn.target_addr = _peer_addr;
-  socket = std::move(sock);
+  conn.socket = std::move(sock);
   logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
   messenger.accept_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
@@ -221,26 +220,26 @@ void ProtocolV2::enable_recording()
 seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
 {
   if (unlikely(record_io)) {
-    return socket->read_exactly(bytes)
-    .then([this] (auto bl) {
+    return conn.socket->read_exactly(bytes
+    ).then([this] (auto bl) {
       rxbuf.append(buffer::create(bl.share()));
       return bl;
     });
   } else {
-    return socket->read_exactly(bytes);
+    return conn.socket->read_exactly(bytes);
   };
 }
 
 seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
 {
   if (unlikely(record_io)) {
-    return socket->read(bytes)
-    .then([this] (auto buf) {
+    return conn.socket->read(bytes
+    ).then([this] (auto buf) {
       rxbuf.append(buf);
       return buf;
     });
   } else {
-    return socket->read(bytes);
+    return conn.socket->read(bytes);
   }
 }
 
@@ -249,7 +248,7 @@ seastar::future<> ProtocolV2::write(bufferlist&& buf)
   if (unlikely(record_io)) {
     txbuf.append(buf);
   }
-  return socket->write(std::move(buf));
+  return conn.socket->write(std::move(buf));
 }
 
 seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
@@ -257,7 +256,7 @@ seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
   if (unlikely(record_io)) {
     txbuf.append(buf);
   }
-  return socket->write_flush(std::move(buf));
+  return conn.socket->write_flush(std::move(buf));
 }
 
 size_t ProtocolV2::get_current_msg_size() const
@@ -811,8 +810,8 @@ ProtocolV2::client_reconnect()
 void ProtocolV2::execute_connecting()
 {
   trigger_state(state_t::CONNECTING, write_state_t::delay, false);
-  if (socket) {
-    socket->shutdown();
+  if (conn.socket) {
+    conn.socket->shutdown();
   }
   gated_execute("execute_connecting", [this] {
       global_seq = messenger.get_global_seq();
@@ -832,9 +831,9 @@ void ProtocolV2::execute_connecting()
                            conn, get_state_name(state));
             abort_protocol();
           }
-          if (socket) {
+          if (conn.socket) {
             gate.dispatch_in_background("close_sockect_connecting", *this,
-                           [sock = std::move(socket)] () mutable {
+                           [sock = std::move(conn.socket)] () mutable {
               return sock->close().then([sock = std::move(sock)] {});
             });
           }
@@ -849,7 +848,7 @@ void ProtocolV2::execute_connecting()
               abort_protocol();
             });
           }
-          socket = std::move(sock);
+          conn.socket = std::move(sock);
           return seastar::now();
         }).then([this] {
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
@@ -869,7 +868,7 @@ void ProtocolV2::execute_connecting()
                            conn, get_state_name(state));
             abort_protocol();
           }
-          socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+          conn.socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
           if (unlikely(_my_addr_from_peer.is_legacy())) {
             logger().warn("{} peer sent a legacy address for me: {}",
                           conn, _my_addr_from_peer);
@@ -1071,7 +1070,7 @@ ProtocolV2::reuse_connection(
 {
   existing_proto->trigger_replacing(reconnect,
                                     do_reset,
-                                    std::move(socket),
+                                    std::move(conn.socket),
                                     std::move(auth_meta),
                                     std::move(session_stream_handlers),
                                     peer_global_seq,
@@ -1682,8 +1681,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_msg_seq)
 {
   trigger_state(state_t::REPLACING, write_state_t::delay, false);
-  if (socket) {
-    socket->shutdown();
+  if (conn.socket) {
+    conn.socket->shutdown();
   }
   dispatchers.ms_handle_accept(
       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
@@ -1719,13 +1718,13 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         });
       }
 
-      if (socket) {
+      if (conn.socket) {
         gate.dispatch_in_background("close_socket_replacing", *this,
-                       [sock = std::move(socket)] () mutable {
+                       [sock = std::move(conn.socket)] () mutable {
           return sock->close().then([sock = std::move(sock)] {});
         });
       }
-      socket = std::move(new_socket);
+      conn.socket = std::move(new_socket);
       auth_meta = std::move(new_auth_meta);
       session_stream_handlers = std::move(new_rxtx);
       record_io = false;
@@ -2021,8 +2020,8 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
 void ProtocolV2::execute_standby()
 {
   trigger_state(state_t::STANDBY, write_state_t::delay, false);
-  if (socket) {
-    socket->shutdown();
+  if (conn.socket) {
+    conn.socket->shutdown();
   }
 }
 
@@ -2040,8 +2039,8 @@ void ProtocolV2::notify_write()
 void ProtocolV2::execute_wait(bool max_backoff)
 {
   trigger_state(state_t::WAIT, write_state_t::delay, false);
-  if (socket) {
-    socket->shutdown();
+  if (conn.socket) {
+    conn.socket->shutdown();
   }
   gated_execute("execute_wait", [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
index cc6abdacf262595183f749e95a7f0875bda4d192..5ba2ea5c5666fe521892b41a3ae3de94e144c6c2 100644 (file)
@@ -134,19 +134,19 @@ seastar::shard_id SocketConnection::shard_id() const {
 }
 
 seastar::socket_address SocketConnection::get_local_address() const {
-  return protocol->socket->get_local_address();
+  return socket->get_local_address();
 }
 
 void SocketConnection::print(ostream& out) const {
     out << (void*)this << " ";
     messenger.print(out);
-    if (!protocol->socket) {
+    if (!socket) {
       out << " >> " << get_peer_name() << " " << peer_addr;
-    } else if (protocol->socket->get_side() == Socket::side_t::acceptor) {
+    } else if (socket->get_side() == Socket::side_t::acceptor) {
       out << " >> " << get_peer_name() << " " << peer_addr
-          << "@" << protocol->socket->get_ephemeral_port();
-    } else { // protocol->socket->get_side() == Socket::side_t::connector
-      out << "@" << protocol->socket->get_ephemeral_port()
+          << "@" << socket->get_ephemeral_port();
+    } else { // socket->get_side() == Socket::side_t::connector
+      out << "@" << socket->get_ephemeral_port()
           << " >> " << get_peer_name() << " " << peer_addr;
     }
 }
index c93eccf3ac0205caa6b36a977e7bca9e694daa22..d4cbca463d30bbb1220bea1806465b7baef330ab 100644 (file)
@@ -37,6 +37,8 @@ class SocketConnection : public Connection {
   SocketMessenger& messenger;
   std::unique_ptr<Protocol> protocol;
 
+  SocketRef socket;
+
   entity_name_t peer_name = {0, entity_name_t::NEW};
 
   entity_addr_t peer_addr;