From 2ac211c352b4e5595ea968991a00550c4469d647 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 31 Oct 2022 17:31:23 +0800 Subject: [PATCH] crimson/net: move socket from Protocol to SocketConnection Protocol class will be removed. Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 15 ++++---- src/crimson/net/Protocol.h | 3 -- src/crimson/net/ProtocolV2.cc | 59 ++++++++++++++--------------- src/crimson/net/SocketConnection.cc | 12 +++--- src/crimson/net/SocketConnection.h | 2 + 5 files changed, 44 insertions(+), 47 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 95878692f02..734af8eda53 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -8,7 +8,6 @@ #include "crimson/common/log.h" #include "crimson/net/Errors.h" #include "crimson/net/chained_dispatchers.h" -#include "crimson/net/Socket.h" #include "crimson/net/SocketConnection.h" #include "msg/Message.h" @@ -51,8 +50,8 @@ void Protocol::close(bool dispatch_reset, if (f_accept_new) { (*f_accept_new)(); } - if (socket) { - socket->shutdown(); + if (conn.socket) { + conn.socket->shutdown(); } set_write_state(write_state_t::drop); assert(!gate.is_closed()); @@ -67,8 +66,8 @@ void Protocol::close(bool dispatch_reset, // asynchronous operations assert(!close_ready.valid()); close_ready = std::move(gate_closed).then([this] { - if (socket) { - return socket->close(); + if (conn.socket) { + return conn.socket->close(); } else { return seastar::now(); } @@ -208,7 +207,7 @@ void Protocol::ack_writes(seq_num_t seq) seastar::future Protocol::try_exit_sweep() { assert(!is_queued()); - return socket->flush().then([this] { + return conn.socket->flush().then([this] { if (!is_queued()) { // still nothing pending to send after flush, // the dispatching can ONLY stop now @@ -242,7 +241,7 @@ seastar::future<> Protocol::do_write_dispatch_sweep() auto acked = ack_left; assert(acked == 0 || conn.in_seq > 0); // sweep all pending writes with the concrete Protocol - return socket->write(sweep_messages_and_move_to_sent( + return conn.socket->write(sweep_messages_and_move_to_sent( num_msgs, need_keepalive, keepalive_ack, acked > 0) ).then([this, prv_keepalive_ack=keepalive_ack, acked] { need_keepalive = false; @@ -292,7 +291,7 @@ seastar::future<> Protocol::do_write_dispatch_sweep() conn, write_state, e); ceph_abort(); } - socket->shutdown(); + conn.socket->shutdown(); if (write_state == write_state_t::open) { logger().info("{} write_event(): fault at {}, going to delay -- {}", conn, write_state, e); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 782b357b9df..f819c469208 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -75,9 +75,6 @@ class Protocol { std::optional keepalive_ack, bool require_ack); - public: - SocketRef socket; - protected: ChainedDispatchers& dispatchers; SocketConnection &conn; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index edcc7de5947..caa19da1e5f 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -16,7 +16,6 @@ #include "chained_dispatchers.h" #include "Errors.h" -#include "Socket.h" #include "SocketConnection.h" #include "SocketMessenger.h" @@ -121,11 +120,11 @@ void intercept(Breakpoint bp, bp_type_t type, } #define INTERCEPT_CUSTOM(bp, type) \ -intercept({bp}, type, conn, socket) +intercept({bp}, type, conn, conn.socket) #define INTERCEPT_FRAME(tag, type) \ intercept({static_cast(tag), type}, \ - type, conn, socket) + type, conn, conn.socket) #define INTERCEPT_N_RW(bp) \ if (conn.interceptor) { \ @@ -178,7 +177,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, const entity_name_t& _peer_name) { ceph_assert(state == state_t::NONE); - ceph_assert(!socket); + ceph_assert(!conn.socket); ceph_assert(!gate.is_closed()); conn.peer_addr = _peer_addr; conn.target_addr = _peer_addr; @@ -199,10 +198,10 @@ void ProtocolV2::start_accept(SocketRef&& sock, const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::NONE); - ceph_assert(!socket); + ceph_assert(!conn.socket); // until we know better conn.target_addr = _peer_addr; - socket = std::move(sock); + conn.socket = std::move(sock); logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); messenger.accept_conn( seastar::static_pointer_cast(conn.shared_from_this())); @@ -221,26 +220,26 @@ void ProtocolV2::enable_recording() seastar::future ProtocolV2::read_exactly(size_t bytes) { if (unlikely(record_io)) { - return socket->read_exactly(bytes) - .then([this] (auto bl) { + return conn.socket->read_exactly(bytes + ).then([this] (auto bl) { rxbuf.append(buffer::create(bl.share())); return bl; }); } else { - return socket->read_exactly(bytes); + return conn.socket->read_exactly(bytes); }; } seastar::future ProtocolV2::read(size_t bytes) { if (unlikely(record_io)) { - return socket->read(bytes) - .then([this] (auto buf) { + return conn.socket->read(bytes + ).then([this] (auto buf) { rxbuf.append(buf); return buf; }); } else { - return socket->read(bytes); + return conn.socket->read(bytes); } } @@ -249,7 +248,7 @@ seastar::future<> ProtocolV2::write(bufferlist&& buf) if (unlikely(record_io)) { txbuf.append(buf); } - return socket->write(std::move(buf)); + return conn.socket->write(std::move(buf)); } seastar::future<> ProtocolV2::write_flush(bufferlist&& buf) @@ -257,7 +256,7 @@ seastar::future<> ProtocolV2::write_flush(bufferlist&& buf) if (unlikely(record_io)) { txbuf.append(buf); } - return socket->write_flush(std::move(buf)); + return conn.socket->write_flush(std::move(buf)); } size_t ProtocolV2::get_current_msg_size() const @@ -811,8 +810,8 @@ ProtocolV2::client_reconnect() void ProtocolV2::execute_connecting() { trigger_state(state_t::CONNECTING, write_state_t::delay, false); - if (socket) { - socket->shutdown(); + if (conn.socket) { + conn.socket->shutdown(); } gated_execute("execute_connecting", [this] { global_seq = messenger.get_global_seq(); @@ -832,9 +831,9 @@ void ProtocolV2::execute_connecting() conn, get_state_name(state)); abort_protocol(); } - if (socket) { + if (conn.socket) { gate.dispatch_in_background("close_sockect_connecting", *this, - [sock = std::move(socket)] () mutable { + [sock = std::move(conn.socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); } @@ -849,7 +848,7 @@ void ProtocolV2::execute_connecting() abort_protocol(); }); } - socket = std::move(sock); + conn.socket = std::move(sock); return seastar::now(); }).then([this] { auth_meta = seastar::make_lw_shared(); @@ -869,7 +868,7 @@ void ProtocolV2::execute_connecting() conn, get_state_name(state)); abort_protocol(); } - socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port()); + conn.socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port()); if (unlikely(_my_addr_from_peer.is_legacy())) { logger().warn("{} peer sent a legacy address for me: {}", conn, _my_addr_from_peer); @@ -1071,7 +1070,7 @@ ProtocolV2::reuse_connection( { existing_proto->trigger_replacing(reconnect, do_reset, - std::move(socket), + std::move(conn.socket), std::move(auth_meta), std::move(session_stream_handlers), peer_global_seq, @@ -1682,8 +1681,8 @@ void ProtocolV2::trigger_replacing(bool reconnect, uint64_t new_msg_seq) { trigger_state(state_t::REPLACING, write_state_t::delay, false); - if (socket) { - socket->shutdown(); + if (conn.socket) { + conn.socket->shutdown(); } dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); @@ -1719,13 +1718,13 @@ void ProtocolV2::trigger_replacing(bool reconnect, }); } - if (socket) { + if (conn.socket) { gate.dispatch_in_background("close_socket_replacing", *this, - [sock = std::move(socket)] () mutable { + [sock = std::move(conn.socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); } - socket = std::move(new_socket); + conn.socket = std::move(new_socket); auth_meta = std::move(new_auth_meta); session_stream_handlers = std::move(new_rxtx); record_io = false; @@ -2021,8 +2020,8 @@ void ProtocolV2::execute_ready(bool dispatch_connect) void ProtocolV2::execute_standby() { trigger_state(state_t::STANDBY, write_state_t::delay, false); - if (socket) { - socket->shutdown(); + if (conn.socket) { + conn.socket->shutdown(); } } @@ -2040,8 +2039,8 @@ void ProtocolV2::notify_write() void ProtocolV2::execute_wait(bool max_backoff) { trigger_state(state_t::WAIT, write_state_t::delay, false); - if (socket) { - socket->shutdown(); + if (conn.socket) { + conn.socket->shutdown(); } gated_execute("execute_wait", [this, max_backoff] { double backoff = protocol_timer.last_dur(); diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index cc6abdacf26..5ba2ea5c566 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -134,19 +134,19 @@ seastar::shard_id SocketConnection::shard_id() const { } seastar::socket_address SocketConnection::get_local_address() const { - return protocol->socket->get_local_address(); + return socket->get_local_address(); } void SocketConnection::print(ostream& out) const { out << (void*)this << " "; messenger.print(out); - if (!protocol->socket) { + if (!socket) { out << " >> " << get_peer_name() << " " << peer_addr; - } else if (protocol->socket->get_side() == Socket::side_t::acceptor) { + } else if (socket->get_side() == Socket::side_t::acceptor) { out << " >> " << get_peer_name() << " " << peer_addr - << "@" << protocol->socket->get_ephemeral_port(); - } else { // protocol->socket->get_side() == Socket::side_t::connector - out << "@" << protocol->socket->get_ephemeral_port() + << "@" << socket->get_ephemeral_port(); + } else { // socket->get_side() == Socket::side_t::connector + out << "@" << socket->get_ephemeral_port() << " >> " << get_peer_name() << " " << peer_addr; } } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index c93eccf3ac0..d4cbca463d3 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -37,6 +37,8 @@ class SocketConnection : public Connection { SocketMessenger& messenger; std::unique_ptr protocol; + SocketRef socket; + entity_name_t peer_name = {0, entity_name_t::NEW}; entity_addr_t peer_addr; -- 2.39.5