From 265fcdd2ed92884eef7e6feacf409bb621abdae2 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Mon, 15 Oct 2018 17:46:55 +0800 Subject: [PATCH] crimson/net: clean seastar-msgr class dependencies Remove protocol-specific interfaces from Messenger/Connection classes, and let SocketMessenger manage SocketConnection instead of Connection. Signed-off-by: Yingxin --- src/crimson/net/Connection.h | 47 ++------------------ src/crimson/net/Fwd.h | 5 --- src/crimson/net/Messenger.h | 2 - src/crimson/net/SocketConnection.cc | 58 +++++++++++++----------- src/crimson/net/SocketConnection.h | 69 ++++++++++++++++++++--------- src/crimson/net/SocketMessenger.cc | 23 +++++----- src/crimson/net/SocketMessenger.h | 20 ++++++--- 7 files changed, 108 insertions(+), 116 deletions(-) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index e5e2a11f92e55..6aa600d3b5e2e 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -27,18 +27,16 @@ using seq_num_t = uint64_t; class Connection : public boost::intrusive_ref_counter { protected: - Messenger *const messenger; entity_addr_t my_addr; entity_addr_t peer_addr; public: - Connection(Messenger *messenger, const entity_addr_t& my_addr, + Connection(const entity_addr_t& my_addr, const entity_addr_t& peer_addr) - : messenger(messenger), my_addr(my_addr), peer_addr(peer_addr) {} + : my_addr(my_addr), peer_addr(peer_addr) {} virtual ~Connection() {} - Messenger* get_messenger() const { return messenger; } - + virtual Messenger* get_messenger() const = 0; const entity_addr_t& get_my_addr() const { return my_addr; } const entity_addr_t& get_peer_addr() const { return peer_addr; } virtual int get_peer_type() const = 0; @@ -46,16 +44,6 @@ class Connection : public boost::intrusive_ref_counter client_handshake(entity_type_t peer_type, - entity_type_t host_type) = 0; - - /// complete a handshake from the server's perspective - virtual seastar::future<> server_handshake() = 0; - - /// read a message from a connection that has completed its handshake - virtual seastar::future read_message() = 0; - /// send a message over a connection that has completed its handshake virtual seastar::future<> send(MessageRef msg) = 0; @@ -65,35 +53,6 @@ class Connection : public boost::intrusive_ref_counter close() = 0; - - /// move all messages in the sent list back into the queue - virtual void requeue_sent() = 0; - - /// get all messages in the out queue - virtual std::tuple> get_out_queue() = 0; - -public: - enum class state_t { - none, - open, - standby, - closed, - wait - }; - /// the number of connections initiated in this session, increment when a - /// new connection is established - virtual uint32_t connect_seq() const = 0; - - /// the client side should connect us with a gseq. it will be reset with a - /// the one of exsting connection if it's greater. - virtual uint32_t peer_global_seq() const = 0; - - virtual seq_num_t rx_seq_num() const = 0; - - /// current state of connection - virtual state_t get_state() const = 0; - virtual bool is_server_side() const = 0; - virtual bool is_lossy() const = 0; }; } // namespace ceph::net diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index d234048a35c32..5aa04812d6021 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -16,16 +16,12 @@ #include -#include "Errors.h" #include "msg/msg_types.h" #include "msg/Message.h" using peer_type_t = int; using auth_proto_t = int; -class Message; -using MessageRef = boost::intrusive_ptr; - namespace ceph::net { using msgr_tag_t = uint8_t; @@ -38,4 +34,3 @@ class Dispatcher; class Messenger; } // namespace ceph::net - diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index e67b304f366ed..149be598975fb 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -60,8 +60,6 @@ class Messenger { } return ++global_seq; } - virtual ConnectionRef lookup_conn(const entity_addr_t&) = 0; - virtual void unregister_conn(ConnectionRef) = 0; // @returns a tuple of virtual seastar::future #include #include #include -#include "Config.h" -#include "Messenger.h" -#include "SocketConnection.h" - #include "include/msgr.h" #include "include/random.h" #include "auth/Auth.h" #include "auth/AuthSessionHandler.h" -#include "msg/Message.h" #include "crimson/common/log.h" +#include "Config.h" +#include "Errors.h" +#include "SocketMessenger.h" using namespace ceph::net; @@ -42,11 +42,12 @@ namespace { } } -SocketConnection::SocketConnection(Messenger *messenger, +SocketConnection::SocketConnection(SocketMessenger& messenger, const entity_addr_t& my_addr, const entity_addr_t& peer_addr, seastar::connected_socket&& fd) - : Connection(messenger, my_addr, peer_addr), + : Connection(my_addr, peer_addr), + messenger(messenger), socket(std::move(fd)), in(socket.input()), out(socket.output()), @@ -61,6 +62,11 @@ SocketConnection::~SocketConnection() send_ready.ignore_ready_future(); } +ceph::net::Messenger* +SocketConnection::get_messenger() const { + return &messenger; +} + bool SocketConnection::is_connected() { return !send_ready.failed(); @@ -284,7 +290,7 @@ bool SocketConnection::update_rx_seq(seq_num_t seq) seastar::future<> SocketConnection::write_message(MessageRef msg) { msg->set_seq(++out_seq); - msg->encode(features, get_messenger()->get_crc_flags()); + msg->encode(features, messenger.get_crc_flags()); bufferlist bl; bl.append(CEPH_MSGR_TAG_MSG); auto& header = msg->get_header(); @@ -297,13 +303,13 @@ seastar::future<> SocketConnection::write_message(MessageRef msg) bl.append((const char*)&footer, sizeof(footer)); } else { ceph_msg_footer_old old_footer; - if (get_messenger()->get_crc_flags() & MSG_CRC_HEADER) { + if (messenger.get_crc_flags() & MSG_CRC_HEADER) { old_footer.front_crc = footer.front_crc; old_footer.middle_crc = footer.middle_crc; } else { old_footer.front_crc = old_footer.middle_crc = 0; } - if (get_messenger()->get_crc_flags() & MSG_CRC_DATA) { + if (messenger.get_crc_flags() & MSG_CRC_DATA) { old_footer.data_crc = footer.data_crc; } else { old_footer.data_crc = 0; @@ -359,9 +365,9 @@ seastar::future<> SocketConnection::close() state = state_t::closed; // unregister_conn() drops a reference, so hold another until completion - auto cleanup = [conn = ConnectionRef(this)] {}; + auto cleanup = [conn = SocketConnectionRef(this)] {}; - get_messenger()->unregister_conn(this); + messenger.unregister_conn(this); // close_ready become valid only after state is state_t::closed assert(!close_ready.valid()); @@ -503,15 +509,15 @@ seastar::future<> SocketConnection::handle_connect() return seastar::make_ready_future( CEPH_MSGR_TAG_FEATURES, bufferlist{}); } - return get_messenger()->verify_authorizer(get_peer_type(), - h.connect.authorizer_protocol, - authorizer); + return messenger.verify_authorizer(get_peer_type(), + h.connect.authorizer_protocol, + authorizer); }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { memset(&h.reply, 0, sizeof(h.reply)); if (tag) { return send_connect_reply(tag, std::move(authorizer_reply)); } - if (auto existing = get_messenger()->lookup_conn(peer_addr); existing) { + if (auto existing = messenger.lookup_conn(peer_addr); existing) { return handle_connect_with_existing(existing, std::move(authorizer_reply)); } else if (h.connect.connect_seq > 0) { return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, @@ -546,7 +552,7 @@ seastar::future<> SocketConnection::send_connect_reply_ready(msgr_tag_t tag, bufferlist&& authorizer_reply) { - h.global_seq = get_messenger()->get_global_seq(); + h.global_seq = messenger.get_global_seq(); h.reply.tag = tag; h.reply.features = policy.features_supported; h.reply.global_seq = h.global_seq; @@ -607,7 +613,7 @@ SocketConnection::handle_keepalive2_ack() } seastar::future<> -SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlist&& authorizer_reply) +SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply) { if (h.connect.global_seq < existing->peer_global_seq()) { h.reply.global_seq = existing->peer_global_seq(); @@ -633,7 +639,7 @@ SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlis h.reply.connect_seq = existing->connect_seq() + 1; return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); } - } else if (get_peer_addr() < get_my_addr() || + } else if (peer_addr < my_addr || existing->is_server_side()) { // incoming wins return replace_existing(existing, std::move(authorizer_reply)); @@ -648,7 +654,7 @@ SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlis } } -seastar::future<> SocketConnection::replace_existing(ConnectionRef existing, +seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply, bool is_reset_from_peer) { @@ -659,7 +665,7 @@ seastar::future<> SocketConnection::replace_existing(ConnectionRef existing, } else { reply_tag = CEPH_MSGR_TAG_READY; } - get_messenger()->unregister_conn(existing); + messenger.unregister_conn(existing); if (!existing->is_lossy()) { // reset the in_seq if this is a hard reset from peer, // otherwise we respect our original connection's value @@ -685,7 +691,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) } h.got_bad_auth = true; // try harder - return get_messenger()->get_authorizer(h.peer_type, true) + return messenger.get_authorizer(h.peer_type, true) .then([this](auto&& auth) { h.authorizer = std::move(auth); return seastar::now(); @@ -694,7 +700,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) reset_session(); return seastar::now(); case CEPH_MSGR_TAG_RETRY_GLOBAL: - h.global_seq = get_messenger()->get_global_seq(h.reply.global_seq); + h.global_seq = messenger.get_global_seq(h.reply.global_seq); return seastar::now(); case CEPH_MSGR_TAG_RETRY_SESSION: ceph_assert(h.reply.connect_seq > h.connect_seq); @@ -779,7 +785,7 @@ seastar::future<> SocketConnection::connect(entity_type_t peer_type, // this is fyi, actually, server decides! h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; - return get_messenger()->get_authorizer(peer_type, false) + return messenger.get_authorizer(peer_type, false) .then([this](auto&& auth) { h.authorizer = std::move(auth); bufferlist bl; @@ -839,7 +845,7 @@ seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type, bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); ::encode(my_addr, bl, 0); - h.global_seq = get_messenger()->get_global_seq(); + h.global_seq = messenger.get_global_seq(); return out.write(std::move(bl)).then([this] { return out.flush(); }); }).then([=] { return seastar::do_until([=] { return state == state_t::open; }, @@ -889,7 +895,7 @@ seastar::future<> SocketConnection::server_handshake() seastar::future<> SocketConnection::fault() { if (policy.lossy) { - get_messenger()->unregister_conn(this); + messenger.unregister_conn(this); } if (h.backoff.count()) { h.backoff += h.backoff; diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 974c082ddcd26..b53f7e1a96211 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -21,15 +21,28 @@ #include "Connection.h" #include "crimson/thread/Throttle.h" +class AuthAuthorizer; class AuthSessionHandler; namespace ceph::net { +class SocketMessenger; +class SocketConnection; +using SocketConnectionRef = boost::intrusive_ptr; + class SocketConnection : public Connection { + SocketMessenger& messenger; seastar::connected_socket socket; seastar::input_stream in; seastar::output_stream out; + enum class state_t { + none, + open, + standby, + closed, + wait + }; state_t state = state_t::none; /// become valid only when state is state_t::closed @@ -60,9 +73,9 @@ class SocketConnection : public Connection { /// server side of handshake negotiation seastar::future<> handle_connect(); - seastar::future<> handle_connect_with_existing(ConnectionRef existing, + seastar::future<> handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply); - seastar::future<> replace_existing(ConnectionRef existing, + seastar::future<> replace_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply, bool is_reset_from_peer = false); seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag, @@ -74,9 +87,6 @@ class SocketConnection : public Connection { seastar::future<> handle_keepalive2_ack(); bool require_auth_feature() const; - int get_peer_type() const override { - return h.connect.host_type; - } uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; /// client side of handshake negotiation seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type); @@ -148,20 +158,19 @@ class SocketConnection : public Connection { seastar::future<> fault(); public: - SocketConnection(Messenger *messenger, + SocketConnection(SocketMessenger& messenger, const entity_addr_t& my_addr, const entity_addr_t& peer_addr, seastar::connected_socket&& socket); ~SocketConnection(); - bool is_connected() override; - - seastar::future<> client_handshake(entity_type_t peer_type, - entity_type_t host_type) override; + Messenger* get_messenger() const override; - seastar::future<> server_handshake() override; + int get_peer_type() const override { + return h.connect.host_type; + } - seastar::future read_message() override; + bool is_connected() override; seastar::future<> send(MessageRef msg) override; @@ -169,31 +178,49 @@ class SocketConnection : public Connection { seastar::future<> close() override; - uint32_t connect_seq() const override { + public: + /// complete a handshake from the client's perspective + seastar::future<> client_handshake(entity_type_t peer_type, + entity_type_t host_type); + + /// complete a handshake from the server's perspective + seastar::future<> server_handshake(); + + /// read a message from a connection that has completed its handshake + seastar::future read_message(); + + /// the number of connections initiated in this session, increment when a + /// new connection is established + uint32_t connect_seq() const { return h.connect_seq; } - uint32_t peer_global_seq() const override { + + /// the client side should connect us with a gseq. it will be reset with + /// the one of exsting connection if it's greater. + uint32_t peer_global_seq() const { return h.peer_global_seq; } seq_num_t rx_seq_num() const { return in_seq; } - state_t get_state() const override { + + /// current state of connection + state_t get_state() const { return state; } - bool is_server_side() const override { + bool is_server_side() const { return policy.server; } - bool is_lossy() const override { + bool is_lossy() const { return policy.lossy; } -private: - void requeue_sent() override; - std::tuple> get_out_queue() override { + /// move all messages in the sent list back into the queue + void requeue_sent(); + + std::tuple> get_out_queue() { return {out_seq, std::move(out_q)}; } - }; } // namespace ceph::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index f86a52fb752ba..1fefd4e1dd200 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -12,12 +12,13 @@ * */ +#include "SocketMessenger.h" + #include + #include "auth/Auth.h" -#include "SocketMessenger.h" -#include "SocketConnection.h" +#include "Errors.h" #include "Dispatcher.h" -#include "msg/Message.h" using namespace ceph::net; @@ -39,7 +40,7 @@ void SocketMessenger::bind(const entity_addr_t& addr) listener = seastar::listen(address, lo); } -seastar::future<> SocketMessenger::dispatch(ConnectionRef conn) +seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn) { auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); std::ignore = i; @@ -79,8 +80,8 @@ seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, entity_addr_t peer_addr; peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - ConnectionRef conn = new SocketConnection(this, get_myaddr(), - peer_addr, std::move(socket)); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), + peer_addr, std::move(socket)); // initiate the handshake return conn->server_handshake() .then([=] { @@ -133,8 +134,8 @@ SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type) } return seastar::connect(addr.in4_addr()) .then([=] (seastar::connected_socket socket) { - ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr, - std::move(socket)); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), addr, + std::move(socket)); // complete the handshake before returning to the caller return conn->client_handshake(peer_type, get_myname().type()) .then([=] { @@ -151,7 +152,7 @@ SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type) // dispatch replies on this connection dispatch(conn) .handle_exception([] (std::exception_ptr eptr) {}); - return conn; + return ConnectionRef(conn); }); }); } @@ -191,7 +192,7 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type, policy_set.set_throttlers(peer_type, throttle, nullptr); } -ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) +ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) { if (auto found = connections.find(addr); found != connections.end()) { @@ -201,7 +202,7 @@ ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) } } -void SocketMessenger::unregister_conn(ConnectionRef conn) +void SocketMessenger::unregister_conn(SocketConnectionRef conn) { ceph_assert(conn); auto found = connections.find(conn->get_peer_addr()); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 9297b37087f75..e025daa733e1e 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -21,6 +21,7 @@ #include "msg/Policy.h" #include "Messenger.h" +#include "SocketConnection.h" #include "crimson/thread/Throttle.h" namespace ceph::net { @@ -30,12 +31,12 @@ using SocketPolicy = ceph::net::Policy; class SocketMessenger final : public Messenger { std::optional listener; Dispatcher *dispatcher = nullptr; - std::map connections; + std::map connections; using Throttle = ceph::thread::Throttle; ceph::net::PolicySet policy_set; seastar::gate pending_dispatch; - seastar::future<> dispatch(ConnectionRef conn); + seastar::future<> dispatch(SocketConnectionRef conn); seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); @@ -51,18 +52,23 @@ class SocketMessenger final : public Messenger { entity_type_t peer_type) override; seastar::future<> shutdown() override; - void set_default_policy(const SocketPolicy& p); - void set_policy(entity_type_t peer_type, const SocketPolicy& p); - void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); - ConnectionRef lookup_conn(const entity_addr_t& addr) override; - void unregister_conn(ConnectionRef) override; + seastar::future verify_authorizer(peer_type_t peer_type, auth_proto_t protocol, bufferlist& auth) override; + seastar::future> get_authorizer(peer_type_t peer_type, bool force_new) override; + + public: + void set_default_policy(const SocketPolicy& p); + void set_policy(entity_type_t peer_type, const SocketPolicy& p); + void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); + + SocketConnectionRef lookup_conn(const entity_addr_t& addr); + void unregister_conn(SocketConnectionRef); }; } // namespace ceph::net -- 2.39.5