From: Yingxin Cheng Date: Tue, 12 Feb 2019 05:11:16 +0000 (+0800) Subject: crimson/net: enable connections on all cores X-Git-Tag: v14.1.0~155^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7ac5fb013bfa1d3666849b5e73a3bbbe0d4f1813;p=ceph.git crimson/net: enable connections on all cores Implement the sharded crimson-messenger: * Sharded Messenger: provides shared-nothing Messenger for each shard, it's interfaces are symmetric to be called, any modifications will be applied to all shards. * Sharded/non-sharded Dispatcher interface: allow connections to be dispatched, and related resources (such as Session) to be managed in its own shard or not. * Sharded Connection: A connection only lives at one dedicated core during its lifecycle. It's sharded by its peer_IP in this PoC, because peer port and nonce are not available when a socket is accepted. While its interfaces are safe to be called from all shards. * Replace `boost::intrusive_ptr` by seastar native smart ptrs for `Connection` and `SocketConnection`, because they need to be destructed from its original core. * Unit test: establish multiple connections on both client and server sides, they runs concurrently and creates sessions that are also following shared-nothing design. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index da1ebb13d2a7..3294d634f6e4 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -116,6 +116,7 @@ set(crimson_mon_srcs set(crimson_net_srcs net/Dispatcher.cc net/Errors.cc + net/Messenger.cc net/SocketConnection.cc net/SocketMessenger.cc net/Socket.cc) @@ -124,7 +125,8 @@ set(crimson_thread_srcs thread/Throttle.cc) add_library(crimson STATIC ${crimson_auth_srcs} - ${crimson_mon_srcs} + # TODO: fix crimson_mon_client with the new design + # ${crimson_mon_srcs} ${crimson_net_srcs} ${crimson_thread_srcs} ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index d8eb656fd729..b1b72c746325 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -15,8 +15,8 @@ #pragma once #include -#include #include +#include #include "Fwd.h" @@ -24,8 +24,7 @@ namespace ceph::net { using seq_num_t = uint64_t; -class Connection : public boost::intrusive_ref_counter { +class Connection : public seastar::enable_shared_from_this { protected: entity_addr_t peer_addr; peer_type_t peer_type = -1; @@ -39,7 +38,7 @@ class Connection : public boost::intrusive_ref_counter is_connected() = 0; /// send a message over a connection that has completed its handshake virtual seastar::future<> send(MessageRef msg) = 0; @@ -51,6 +50,9 @@ class Connection : public boost::intrusive_ref_counter close() = 0; + /// which shard id the connection lives + virtual seastar::shard_id shard_id() const = 0; + virtual void print(ostream& out) const = 0; }; diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index 94d6613e269a..cbde15499286 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include "Fwd.h" @@ -54,6 +55,11 @@ class Dispatcher { } virtual seastar::future> ms_get_authorizer(peer_type_t); + + // get the local dispatcher shard if it is accessed by another core + virtual Dispatcher* get_local_shard() { + return this; + } }; } // namespace ceph::net diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 5aa04812d602..8a0a1c96f22c 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -14,7 +14,8 @@ #pragma once -#include +#include +#include #include "msg/msg_types.h" #include "msg/Message.h" @@ -27,10 +28,25 @@ namespace ceph::net { using msgr_tag_t = uint8_t; class Connection; -using ConnectionRef = boost::intrusive_ptr; +using ConnectionRef = seastar::shared_ptr; +// NOTE: ConnectionXRef should only be used in seastar world, because +// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads. +using ConnectionXRef = seastar::lw_shared_ptr>; class Dispatcher; class Messenger; +template +seastar::future create_sharded(Args... args) { + auto sharded_obj = seastar::make_lw_shared>(); + return sharded_obj->start(args...).then([sharded_obj]() { + auto ret = &sharded_obj->local(); + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().finally([sharded_obj] {}); + }); + return ret; + }); +} + } // namespace ceph::net diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc new file mode 100644 index 000000000000..bd6c48b40224 --- /dev/null +++ b/src/crimson/net/Messenger.cc @@ -0,0 +1,20 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Messenger.h" +#include "SocketMessenger.h" + +namespace ceph::net { + +seastar::future +Messenger::create(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) +{ + return create_sharded(name, lname, nonce) + .then([](Messenger *msgr) { + return msgr; + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 6b77e3f5718c..a8669f4fb0be 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -37,24 +37,29 @@ class Messenger { const entity_name_t& get_myname() const { return my_name; } const entity_addrvec_t& get_myaddrs() const { return my_addrs; } entity_addr_t get_myaddr() const { return my_addrs.front(); } - virtual void set_myaddrs(const entity_addrvec_t& addrs) { + virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) { my_addrs = addrs; + return seastar::now(); } /// bind to the given address - virtual void bind(const entity_addrvec_t& addr) = 0; + virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0; /// try to bind to the first unused port of given address - virtual void try_bind(const entity_addrvec_t& addr, - uint32_t min_port, uint32_t max_port) = 0; + virtual seastar::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) = 0; /// start the messenger virtual seastar::future<> start(Dispatcher *dispatcher) = 0; /// either return an existing connection to the peer, /// or a new pending connection - virtual ConnectionRef connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type) = 0; + virtual seastar::future + connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) = 0; + + // wait for messenger shutdown + virtual seastar::future<> wait() = 0; /// stop listenening and wait for all connections to close. safe to destruct /// after this future becomes available @@ -77,7 +82,15 @@ class Messenger { crc_flags |= MSG_CRC_HEADER; } + // get the local messenger shard if it is accessed by another core + virtual Messenger* get_local_shard() { + return this; + } + virtual void print(ostream& out) const = 0; + + static seastar::future + create(const entity_name_t& name, const std::string& lname, const uint64_t nonce); }; inline ostream& operator<<(ostream& out, const Messenger& msgr) { diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 3c0411355088..c1a2ed59a4ce 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -12,6 +12,7 @@ namespace ceph::net { class Socket { + const seastar::shard_id sid; seastar::connected_socket socket; seastar::input_stream in; seastar::output_stream out; @@ -24,10 +25,11 @@ class Socket public: explicit Socket(seastar::connected_socket&& _socket) - : socket(std::move(_socket)), + : sid{seastar::engine().cpu_id()}, + socket(std::move(_socket)), in(socket.input()), out(socket.output()) {} - Socket(Socket&& o) = default; + Socket(Socket&& o) = delete; /// read the requested number of bytes into a bufferlist seastar::future read(size_t bytes); @@ -47,7 +49,10 @@ class Socket /// Socket can only be closed once. seastar::future<> close() { - return seastar::when_all(in.close(), out.close()).discard_result(); + return seastar::smp::submit_to(sid, [this] { + return seastar::when_all( + in.close(), out.close()).discard_result(); + }); } }; diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 6e2d1a023072..1cc8963bd4bd 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -49,6 +49,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger, dispatcher(dispatcher), send_ready(h.promise.get_future()) { + ceph_assert(&messenger.container().local() == &messenger); } SocketConnection::~SocketConnection() @@ -61,37 +62,50 @@ SocketConnection::get_messenger() const { return &messenger; } -bool SocketConnection::is_connected() +seastar::future SocketConnection::is_connected() { - return !send_ready.failed(); + return seastar::smp::submit_to(shard_id(), [this] { + return !send_ready.failed(); + }); } seastar::future<> SocketConnection::send(MessageRef msg) { - if (state == state_t::closing) - return seastar::now(); - return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { - return do_send(std::move(msg)) - .handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} send fault: {}", *this, eptr); - close(); + return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { + if (state == state_t::closing) + return seastar::now(); + return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { + return do_send(std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} send fault: {}", *this, eptr); + close(); + }); }); }); } seastar::future<> SocketConnection::keepalive() { - if (state == state_t::closing) - return seastar::now(); - return seastar::with_gate(pending_dispatch, [this] { - return do_keepalive() - .handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} keepalive fault: {}", *this, eptr); - close(); + return seastar::smp::submit_to(shard_id(), [this] { + if (state == state_t::closing) + return seastar::now(); + return seastar::with_gate(pending_dispatch, [this] { + return do_keepalive() + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} keepalive fault: {}", *this, eptr); + close(); + }); }); }); } +seastar::future<> SocketConnection::close() +{ + return seastar::smp::submit_to(shard_id(), [this] { + return do_close(); + }); +} + seastar::future<> SocketConnection::handle_tags() { return seastar::keep_doing([this] { @@ -196,10 +210,13 @@ seastar::future<> SocketConnection::read_message() } constexpr bool add_ref = false; // Message starts with 1 ref + // TODO: change MessageRef with foreign_ptr auto msg_ref = MessageRef{msg, add_ref}; // start dispatch, ignoring exceptions from the application layer seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { - return dispatcher.ms_dispatch(this, std::move(msg)) + return dispatcher.ms_dispatch( + seastar::static_pointer_cast(shared_from_this()), + std::move(msg)) .handle_exception([this] (std::exception_ptr eptr) { logger().error("{} ms_dispatch caught exception: {}", *this, eptr); ceph_assert(false); @@ -298,7 +315,7 @@ seastar::future<> SocketConnection::do_keepalive() return f.get_future(); } -seastar::future<> SocketConnection::close() +seastar::future<> SocketConnection::do_close() { if (state == state_t::closing) { // already closing @@ -307,12 +324,14 @@ seastar::future<> SocketConnection::close() } // unregister_conn() drops a reference, so hold another until completion - auto cleanup = [conn = SocketConnectionRef(this)] {}; + auto cleanup = [conn_ref = shared_from_this(), this] { + logger().debug("{} closed!", *this); + }; if (state == state_t::accepting) { - messenger.unaccept_conn(this); + messenger.unaccept_conn(seastar::static_pointer_cast(shared_from_this())); } else if (state >= state_t::connecting && state < state_t::closing) { - messenger.unregister_conn(this); + messenger.unregister_conn(seastar::static_pointer_cast(shared_from_this())); } else { // cannot happen ceph_assert(false); @@ -785,7 +804,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, ceph_assert(!socket); peer_addr = _peer_addr; peer_type = _peer_type; - messenger.register_conn(this); + messenger.register_conn(seastar::static_pointer_cast(shared_from_this())); logger().debug("{} trigger connecting, was {}", *this, static_cast(state)); state = state_t::connecting; seastar::with_gate(pending_dispatch, [this] { @@ -796,7 +815,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, fd.shutdown_output(); throw std::system_error(make_error_code(error::connection_aborted)); } - socket.emplace(std::move(fd)); + socket = seastar::make_foreign(std::make_unique(std::move(fd))); // read server's handshake header return socket->read(server_header_size); }).then([this] (bufferlist headerbl) { @@ -810,8 +829,8 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, side = side_t::connector; socket_port = caddr.get_port(); - messenger.learned_addr(caddr); - + return messenger.learned_addr(caddr); + }).then([this] { // encode/send client's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); @@ -824,7 +843,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, }); }).then([this] { // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_connect(this); + return dispatcher.ms_handle_connect(seastar::static_pointer_cast(shared_from_this())); }).then([this] { execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { @@ -837,7 +856,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, } void -SocketConnection::start_accept(seastar::connected_socket&& fd, +SocketConnection::start_accept(seastar::foreign_ptr>&& sock, const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::none); @@ -846,8 +865,8 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, peer_addr.set_port(0); side = side_t::acceptor; socket_port = _peer_addr.get_port(); - socket.emplace(std::move(fd)); - messenger.accept_conn(this); + socket = std::move(sock); + messenger.accept_conn(seastar::static_pointer_cast(shared_from_this())); logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); state = state_t::accepting; seastar::with_gate(pending_dispatch, [this, _peer_addr] { @@ -874,10 +893,10 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, }); }).then([this] { // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_accept(this); + return dispatcher.ms_handle_accept(seastar::static_pointer_cast(shared_from_this())); }).then([this] { - messenger.register_conn(this); - messenger.unaccept_conn(this); + messenger.register_conn(seastar::static_pointer_cast(shared_from_this())); + messenger.unaccept_conn(seastar::static_pointer_cast(shared_from_this())); execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state @@ -902,12 +921,12 @@ SocketConnection::execute_open() logger().warn("{} open fault: {}", *this, e); if (e.code() == error::connection_aborted || e.code() == error::connection_reset) { - return dispatcher.ms_handle_reset(this) + return dispatcher.ms_handle_reset(seastar::static_pointer_cast(shared_from_this())) .then([this] { close(); }); } else if (e.code() == error::read_eof) { - return dispatcher.ms_handle_remote_reset(this) + return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast(shared_from_this())) .then([this] { close(); }); @@ -925,7 +944,7 @@ SocketConnection::execute_open() seastar::future<> SocketConnection::fault() { if (policy.lossy) { - messenger.unregister_conn(this); + messenger.unregister_conn(seastar::static_pointer_cast(shared_from_this())); } if (h.backoff.count()) { h.backoff += h.backoff; @@ -938,6 +957,10 @@ seastar::future<> SocketConnection::fault() return seastar::sleep(h.backoff); } +seastar::shard_id SocketConnection::shard_id() const { + return messenger.shard_id(); +} + void SocketConnection::print(ostream& out) const { messenger.print(out); if (side == side_t::none) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 6d6ea5176275..62cc77d53477 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "msg/Policy.h" #include "Connection.h" @@ -32,11 +33,11 @@ using stop_t = seastar::stop_iteration; class SocketMessenger; class SocketConnection; -using SocketConnectionRef = boost::intrusive_ptr; +using SocketConnectionRef = seastar::shared_ptr; class SocketConnection : public Connection { SocketMessenger& messenger; - std::optional socket; + seastar::foreign_ptr> socket; Dispatcher& dispatcher; seastar::gate pending_dispatch; @@ -162,6 +163,7 @@ class SocketConnection : public Connection { seastar::future<> do_send(MessageRef msg); seastar::future<> do_keepalive(); + seastar::future<> do_close(); public: SocketConnection(SocketMessenger& messenger, @@ -174,7 +176,7 @@ class SocketConnection : public Connection { return peer_type; } - bool is_connected() override; + seastar::future is_connected() override; seastar::future<> send(MessageRef msg) override; @@ -182,6 +184,8 @@ class SocketConnection : public Connection { seastar::future<> close() override; + seastar::shard_id shard_id() const override; + void print(ostream& out) const override; public: @@ -191,7 +195,7 @@ class SocketConnection : public Connection { const entity_type_t& peer_type); /// start a handshake from the server's perspective, /// only call when SocketConnection first construct - void start_accept(seastar::connected_socket&& socket, + void start_accept(seastar::foreign_ptr>&& socket, const entity_addr_t& peer_addr); /// the number of connections initiated in this session, increment when a diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index c0ad81cd3b68..c49729e8c374 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -15,10 +15,12 @@ #include "SocketMessenger.h" #include +#include #include "auth/Auth.h" #include "Errors.h" #include "Dispatcher.h" +#include "Socket.h" using namespace ceph::net; @@ -31,59 +33,106 @@ namespace { SocketMessenger::SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint32_t nonce) - : Messenger{myname}, logic_name{logic_name}, nonce{nonce} + : Messenger{myname}, + sid{seastar::engine().cpu_id()}, + logic_name{logic_name}, + nonce{nonce} {} -void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) +seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) { auto my_addrs = addrs; for (auto& addr : my_addrs.v) { addr.nonce = nonce; } - // TODO: propagate to all the cores of the Messenger - Messenger::set_myaddrs(my_addrs); + return container().invoke_on_all([my_addrs](auto& msgr) { + return msgr.Messenger::set_myaddrs(my_addrs); + }); } -void SocketMessenger::bind(const entity_addrvec_t& addrs) +seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) { - // TODO: v2: listen on multiple addresses - auto addr = addrs.legacy_addr(); - - if (addr.get_family() != AF_INET) { - throw std::system_error(EAFNOSUPPORT, std::generic_category()); + ceph_assert(addrs.legacy_addr().get_family() == AF_INET); + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; } - - set_myaddrs(addrs); - logger().info("listening on {}", addr); - seastar::socket_address address(addr.in4_addr()); - seastar::listen_options lo; - lo.reuse_address = true; - listener = seastar::listen(address, lo); + logger().info("listening on {}", my_addrs.legacy_addr().in4_addr()); + return container().invoke_on_all([my_addrs](auto& msgr) { + msgr.do_bind(my_addrs); + }); } -void SocketMessenger::try_bind(const entity_addrvec_t& addrs, - uint32_t min_port, uint32_t max_port) +seastar::future<> +SocketMessenger::try_bind(const entity_addrvec_t& addrs, + uint32_t min_port, uint32_t max_port) { auto addr = addrs.legacy_or_front_addr(); if (addr.get_port() != 0) { return bind(addrs); } - for (auto port = min_port; port <= max_port; port++) { - try { - addr.set_port(port); - bind(entity_addrvec_t{addr}); - logger().info("{}: try_bind: done", *this); - return; - } catch (const std::system_error& e) { - logger().debug("{}: try_bind: {} already used", *this, port); - if (port == max_port) { - throw; - } - } - } + ceph_assert(min_port <= max_port); + return seastar::do_with(uint32_t(min_port), + [this, max_port, addr] (auto& port) { + return seastar::repeat([this, max_port, addr, &port] { + auto to_bind = addr; + to_bind.set_port(port); + return bind(entity_addrvec_t{to_bind}) + .then([this] { + logger().info("{}: try_bind: done", *this); + return stop_t::yes; + }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { + logger().debug("{}: try_bind: {} already used", *this, port); + if (port == max_port) { + throw e; + } + ++port; + return stop_t::no; + }); + }); + }); +} + +seastar::future<> SocketMessenger::start(Dispatcher *disp) { + return container().invoke_on_all([disp](auto& msgr) { + return msgr.do_start(disp->get_local_shard()); + }); +} + +seastar::future +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + auto shard = locate_shard(peer_addr); + return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) { + return msgr.do_connect(peer_addr, peer_type); + }).then([](seastar::foreign_ptr&& conn) { + return seastar::make_lw_shared>(std::move(conn)); + }); } -seastar::future<> SocketMessenger::start(Dispatcher *disp) +seastar::future<> SocketMessenger::shutdown() +{ + return container().invoke_on_all([](auto& msgr) { + return msgr.do_shutdown(); + }).finally([this] { + return container().invoke_on_all([](auto& msgr) { + msgr.shutdown_promise.set_value(); + }); + }); +} + +void SocketMessenger::do_bind(const entity_addrvec_t& addrs) +{ + Messenger::set_myaddrs(addrs); + + // TODO: v2: listen on multiple addresses + seastar::socket_address address(addrs.legacy_addr().in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + listener = seastar::listen(address, lo); +} + +seastar::future<> SocketMessenger::do_start(Dispatcher *disp) { dispatcher = disp; @@ -96,9 +145,15 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) // allocate the connection entity_addr_t peer_addr; peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); + auto shard = locate_shard(peer_addr); +#warning fixme + // we currently do dangerous i/o from a Connection core, different from the Socket core. + auto sock = seastar::make_foreign(std::make_unique(std::move(socket))); // don't wait before accepting another - conn->start_accept(std::move(socket), peer_addr); + container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable { + SocketConnectionRef conn = seastar::make_shared(msgr, *msgr.dispatcher); + conn->start_accept(std::move(sock), peer_addr); + }); }); }).handle_exception_type([this] (const std::system_error& e) { // stop gracefully on connection_aborted @@ -111,18 +166,18 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) return seastar::now(); } -ceph::net::ConnectionRef -SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +seastar::foreign_ptr +SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) { if (auto found = lookup_conn(peer_addr); found) { - return found; + return seastar::make_foreign(found->shared_from_this()); } - SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); + SocketConnectionRef conn = seastar::make_shared(*this, *dispatcher); conn->start_connect(peer_addr, peer_type); - return conn; + return seastar::make_foreign(conn->shared_from_this()); } -seastar::future<> SocketMessenger::shutdown() +seastar::future<> SocketMessenger::do_shutdown() { if (listener) { listener->abort_accept(); @@ -140,11 +195,11 @@ seastar::future<> SocketMessenger::shutdown() }); } -void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { if (!get_myaddr().is_blank_ip()) { // already learned or binded - return; + return seastar::now(); } // Only learn IP address if blank. @@ -152,7 +207,7 @@ void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) addr.u = peer_addr_for_me.u; addr.set_type(peer_addr_for_me.get_type()); addr.set_port(get_myaddr().get_port()); - set_myaddrs(entity_addrvec_t{addr}); + return set_myaddrs(entity_addrvec_t{addr}); } void SocketMessenger::set_default_policy(const SocketPolicy& p) @@ -173,6 +228,16 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type, policy_set.set_throttlers(peer_type, throttle, nullptr); } +seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr) +{ + ceph_assert(addr.get_family() == AF_INET); + std::size_t seed = 0; + boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr); + //boost::hash_combine(seed, addr.u.sin.sin_port); + //boost::hash_combine(seed, addr.nonce); + return seed % seastar::smp::count; +} + ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) { if (auto found = connections.find(addr); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 0fb5d9fd7dd0..20b0ad56e93f 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "msg/Policy.h" #include "Messenger.h" @@ -29,7 +30,10 @@ namespace ceph::net { using SocketPolicy = ceph::net::Policy; -class SocketMessenger final : public Messenger { +class SocketMessenger final : public Messenger, public seastar::peering_sharded_service { + const seastar::shard_id sid; + seastar::promise<> shutdown_promise; + std::optional listener; Dispatcher *dispatcher = nullptr; std::map connections; @@ -43,25 +47,46 @@ class SocketMessenger final : public Messenger { seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); + void do_bind(const entity_addrvec_t& addr); + seastar::future<> do_start(Dispatcher *disp); + seastar::foreign_ptr do_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + seastar::future<> do_shutdown(); + // conn sharding options: + // 1. Simplest: sharded by ip only + // 2. Balanced: sharded by ip + port + nonce, + // but, need to move SocketConnection between cores. + seastar::shard_id locate_shard(const entity_addr_t& addr); + public: SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint32_t nonce); - void set_myaddrs(const entity_addrvec_t& addr) override; + seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override; - void bind(const entity_addrvec_t& addr) override; + // Messenger interfaces are assumed to be called from its own shard, but its + // behavior should be symmetric when called from any shard. + seastar::future<> bind(const entity_addrvec_t& addr) override; - void try_bind(const entity_addrvec_t& addr, - uint32_t min_port, uint32_t max_port) override; + seastar::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) override; seastar::future<> start(Dispatcher *dispatcher) override; - ConnectionRef connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type) override; + seastar::future connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; + // can only wait once + seastar::future<> wait() override { + return shutdown_promise.get_future(); + } seastar::future<> shutdown() override; + Messenger* get_local_shard() override { + return &container().local(); + } + void print(ostream& out) const override { out << get_myname() << "(" << logic_name @@ -69,7 +94,7 @@ class SocketMessenger final : public Messenger { } public: - void learned_addr(const entity_addr_t &peer_addr_for_me); + seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me); void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p); void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); @@ -79,6 +104,15 @@ class SocketMessenger final : public Messenger { void unaccept_conn(SocketConnectionRef); void register_conn(SocketConnectionRef); void unregister_conn(SocketConnectionRef); + + // required by sharded<> + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + + seastar::shard_id shard_id() const { + return sid; + } }; } // namespace ceph::net diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 7ce959c044f8..d06fd341bd14 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -9,12 +9,13 @@ add_ceph_unittest(unittest_seastar_denc) target_link_libraries(unittest_seastar_denc crimson GTest::Main) add_executable(unittest_seastar_messenger test_messenger.cc) -#add_ceph_unittest(unittest_seastar_messenger) +add_ceph_unittest(unittest_seastar_messenger) target_link_libraries(unittest_seastar_messenger ceph-common crimson) -add_executable(unittest_seastar_echo - test_alien_echo.cc) -target_link_libraries(unittest_seastar_echo ceph-common global crimson) +# TODO: fix unittest_seastar_echo with the new design +#add_executable(unittest_seastar_echo +# test_alien_echo.cc) +#target_link_libraries(unittest_seastar_echo ceph-common global crimson) add_executable(unittest_seastar_thread_pool test_thread_pool.cc) @@ -25,9 +26,10 @@ add_executable(unittest_seastar_config test_config.cc) target_link_libraries(unittest_seastar_config crimson) -add_executable(unittest_seastar_monc - test_monc.cc) -target_link_libraries(unittest_seastar_monc crimson) +# TODO: fix unittest_seastar_monc with the new design +#add_executable(unittest_seastar_monc +# test_monc.cc) +#target_link_libraries(unittest_seastar_monc crimson) add_executable(unittest_seastar_perfcounters test_perfcounters.cc) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 9b6cede47ab7..d13dc9f6782f 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -1,16 +1,26 @@ #include "messages/MPing.h" +#include "crimson/common/log.h" #include "crimson/net/Connection.h" #include "crimson/net/Dispatcher.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" +#include #include #include #include +#include #include #include +#include namespace bpo = boost::program_options; +namespace { + +seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); +} + static std::random_device rd; static std::default_random_engine rng{rd()}; static bool verbose = false; @@ -19,106 +29,226 @@ static seastar::future<> test_echo(unsigned rounds, double keepalive_ratio) { struct test_state { - entity_addr_t addr; - - struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1", 1}; - struct ServerDispatcher : ceph::net::Dispatcher { - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, - MessageRef m) override { - if (verbose) { - std::cout << "server got " << *m << std::endl; - } - // reply with a pong - return c->send(MessageRef{new MPing(), false}); + struct Server final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + ceph::net::Messenger *msgr = nullptr; + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + if (verbose) { + logger().info("server got {}", *m); } - } dispatcher; - } server; + // reply with a pong + return c->send(MessageRef{new MPing(), false}); + } - struct { - unsigned rounds; - std::bernoulli_distribution keepalive_dist{}; - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1", 2}; - struct ClientDispatcher : ceph::net::Dispatcher { - seastar::promise reply; + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const entity_addr_t& addr) { + auto&& fut = ceph::net::Messenger::create(name, lname, nonce); + return fut.then([this, addr](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + }).then([messenger, addr] { + return messenger->bind(entity_addrvec_t{addr}); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); + } + }; + + struct Client final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + + struct PingSession : public seastar::enable_shared_from_this { unsigned count = 0u; - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, - MessageRef m) override { - ++count; - if (verbose) { - std::cout << "client ms_dispatch " << count << std::endl; - } - reply.set_value(std::move(m)); + }; + using PingSessionRef = seastar::shared_ptr; + + unsigned rounds; + std::bernoulli_distribution keepalive_dist; + ceph::net::Messenger *msgr = nullptr; + std::map> pending_conns; + std::map sessions; + + Client(unsigned rounds, double keepalive_ratio) + : rounds(rounds), + keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {} + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::now(); + } + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { + logger().info("{}: connected to {}", *conn, conn->get_peer_addr()); + auto session = seastar::make_shared(); + auto [i, added] = sessions.emplace(conn, session); + std::ignore = i; + ceph_assert(added); + return container().invoke_on_all([conn = conn.get()](auto& client) { + auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); + std::ignore = i; + ceph_assert(added); + }); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + auto found = sessions.find(c); + if (found == sessions.end()) { + ceph_assert(false); + } + auto session = found->second; + ++(session->count); + if (verbose) { + logger().info("client ms_dispatch {}", session->count); + } + + if (session->count == rounds) { + logger().info("{}: finished receiving {} pongs", *c.get(), session->count); + return container().invoke_on_all([conn = c.get()](auto &client) { + auto found = client.pending_conns.find(conn); + ceph_assert(found != client.pending_conns.end()); + found->second.set_value(); + }); + } else { return seastar::now(); } - } dispatcher; - seastar::future<> pingpong(ceph::net::ConnectionRef c) { - return seastar::repeat([conn=std::move(c), this] { - if (keepalive_dist(rng)) { - return conn->keepalive().then([] { - return seastar::make_ready_future( - seastar::stop_iteration::no); + } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + return ceph::net::Messenger::create(name, lname, nonce) + .then([this](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& client) { + client.msgr = messenger->get_local_shard(); + }).then([this, messenger] { + return messenger->start(this); }); - } else { - return conn->send(MessageRef{new MPing(), false}).then([&] { - return dispatcher.reply.get_future(); - }).then([&] (MessageRef msg) { - dispatcher.reply = seastar::promise{}; - if (verbose) { - std::cout << "client got reply " << *msg << std::endl; - } - return seastar::make_ready_future( - seastar::stop_iteration::yes); - }); - }; - }); + }); } - bool done() const { - return dispatcher.count >= rounds; + + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); } - } client; - }; - return seastar::do_with(test_state{}, - [rounds, keepalive_ratio] (test_state& t) { - // bind the server - t.addr.set_type(entity_addr_t::TYPE_LEGACY); - t.addr.set_family(AF_INET); - t.addr.set_port(9010); - t.addr.set_nonce(1); - t.server.messenger.bind(entity_addrvec_t{t.addr}); - - t.client.rounds = rounds; - t.client.keepalive_dist = std::bernoulli_distribution{keepalive_ratio}; - - return t.server.messenger.start(&t.server.dispatcher) - .then([&] { - return t.client.messenger.start(&t.client.dispatcher) - .then([&] { - return t.client.messenger.connect(t.addr, - entity_name_t::TYPE_OSD); - }).then([&client=t.client] (ceph::net::ConnectionRef conn) { - if (verbose) { - std::cout << "client connected" << std::endl; - } - return seastar::repeat([&client,conn=std::move(conn)] { - return client.pingpong(conn).then([&client] { - return seastar::make_ready_future( - client.done() ? - seastar::stop_iteration::yes : - seastar::stop_iteration::no); - }); + + seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) { + return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) + .then([this, foreign_dispatch](auto conn) { + if (foreign_dispatch) { + return do_dispatch_pingpong(&**conn) + .finally([this, conn] {}); + } else { + // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong(). + return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { + return client.do_dispatch_pingpong(conn); + }).finally([this, conn] {}); + } + }); + } + + private: + seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) { + return seastar::do_with(0u, 0u, + [this, conn](auto &count_ping, auto &count_keepalive) { + return seastar::do_until( + [this, conn, &count_ping, &count_keepalive] { + bool stop = (count_ping == rounds); + if (stop) { + logger().info("{}: finished sending {} pings with {} keepalives", + *conn, count_ping, count_keepalive); + } + return stop; + }, + [this, conn, &count_ping, &count_keepalive] { + return seastar::repeat([this, conn, &count_ping, &count_keepalive] { + if (keepalive_dist(rng)) { + count_keepalive += 1; + return conn->keepalive() + .then([&count_keepalive] { + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + } else { + count_ping += 1; + return conn->send(MessageRef{new MPing(), false}) + .then([] { + return seastar::make_ready_future( + seastar::stop_iteration::yes); + }); + } + }); + }).then([this, conn] { + auto found = pending_conns.find(conn); + if (found == pending_conns.end()) + throw std::runtime_error{"Not connected."}; + return found->second.get_future(); }); - }).finally([&] { - if (verbose) { - std::cout << "client shutting down" << std::endl; - } - return t.client.messenger.shutdown(); - }); - }).finally([&] { - if (verbose) { - std::cout << "server shutting down" << std::endl; - } - return t.server.messenger.shutdown(); + }); + } + }; + }; + + logger().info("test_echo():"); + return seastar::when_all_succeed( + ceph::net::create_sharded(), + ceph::net::create_sharded(), + ceph::net::create_sharded(rounds, keepalive_ratio), + ceph::net::create_sharded(rounds, keepalive_ratio)) + .then([rounds, keepalive_ratio](test_state::Server *server1, + test_state::Server *server2, + test_state::Client *client1, + test_state::Client *client2) { + // start servers and clients + entity_addr_t addr1; + addr1.parse("127.0.0.1:9010", nullptr); + addr1.set_type(entity_addr_t::TYPE_LEGACY); + entity_addr_t addr2; + addr2.parse("127.0.0.1:9011", nullptr); + addr2.set_type(entity_addr_t::TYPE_LEGACY); + return seastar::when_all_succeed( + server1->init(entity_name_t::OSD(0), "server1", 1, addr1), + server2->init(entity_name_t::OSD(1), "server2", 2, addr2), + client1->init(entity_name_t::OSD(2), "client1", 3), + client2->init(entity_name_t::OSD(3), "client2", 4)) + // dispatch pingpoing + .then([client1, client2, server1, server2] { + return seastar::when_all_succeed( + // test connecting in parallel, accepting in parallel, + // and operating the connection reference from a foreign/local core + client1->dispatch_pingpong(server1->msgr->get_myaddr(), true), + client1->dispatch_pingpong(server2->msgr->get_myaddr(), false), + client2->dispatch_pingpong(server1->msgr->get_myaddr(), false), + client2->dispatch_pingpong(server2->msgr->get_myaddr(), true)); + // shutdown + }).finally([client1] { + logger().info("client1 shutdown..."); + return client1->shutdown(); + }).finally([client2] { + logger().info("client2 shutdown..."); + return client2->shutdown(); + }).finally([server1] { + logger().info("server1 shutdown..."); + return server1->shutdown(); + }).finally([server2] { + logger().info("server2 shutdown..."); + return server2->shutdown(); }); }); } @@ -126,68 +256,120 @@ static seastar::future<> test_echo(unsigned rounds, static seastar::future<> test_concurrent_dispatch() { struct test_state { - entity_addr_t addr; - - struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2", 3}; - class ServerDispatcher : public ceph::net::Dispatcher { - int count = 0; - seastar::promise<> on_second; // satisfied on second dispatch - seastar::promise<> on_done; // satisfied when first dispatch unblocks - public: - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, - MessageRef m) override { - switch (++count) { - case 1: - // block on the first request until we reenter with the second - return on_second.get_future().then([=] { on_done.set_value(); }); - case 2: - on_second.set_value(); - return seastar::now(); - default: - throw std::runtime_error("unexpected count"); - } + struct Server final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + ceph::net::Messenger *msgr = nullptr; + int count = 0; + seastar::promise<> on_second; // satisfied on second dispatch + seastar::promise<> on_done; // satisfied when first dispatch unblocks + + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + switch (++count) { + case 1: + // block on the first request until we reenter with the second + return on_second.get_future() + .then([this] { + return container().invoke_on_all([](Server& server) { + server.on_done.set_value(); + }); + }); + case 2: + on_second.set_value(); + return seastar::now(); + default: + throw std::runtime_error("unexpected count"); } - seastar::future<> wait() { return on_done.get_future(); } - } dispatcher; - } server; - - struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2", 4}; - ceph::net::Dispatcher dispatcher; - } client; + } + + seastar::future<> wait() { return on_done.get_future(); } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const entity_addr_t& addr) { + return ceph::net::Messenger::create(name, lname, nonce) + .then([this, addr](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + }).then([messenger, addr] { + return messenger->bind(entity_addrvec_t{addr}); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + }; + + struct Client final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + ceph::net::Messenger *msgr = nullptr; + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + return ceph::net::Messenger::create(name, lname, nonce) + .then([this](ceph::net::Messenger *messenger) { + return container().invoke_on_all([messenger](auto& client) { + client.msgr = messenger->get_local_shard(); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + }; }; - return seastar::do_with(test_state{}, - [] (test_state& t) { - // bind the server - t.addr.set_type(entity_addr_t::TYPE_LEGACY); - t.addr.set_family(AF_INET); - t.addr.set_port(9010); - t.addr.set_nonce(3); - t.server.messenger.bind(entity_addrvec_t{t.addr}); - - return t.server.messenger.start(&t.server.dispatcher) - .then([&] { - return t.client.messenger.start(&t.client.dispatcher) - .then([&] { - return t.client.messenger.connect(t.addr, - entity_name_t::TYPE_OSD); - }).then([] (ceph::net::ConnectionRef conn) { - // send two messages - conn->send(MessageRef{new MPing, false}); - conn->send(MessageRef{new MPing, false}); - }).then([&] { - // wait for the server to get both - return t.server.dispatcher.wait(); - }).finally([&] { - return t.client.messenger.shutdown(); - }); - }).finally([&] { - return t.server.messenger.shutdown(); + + logger().info("test_concurrent_dispatch():"); + return seastar::when_all_succeed( + ceph::net::create_sharded(), + ceph::net::create_sharded()) + .then([](test_state::Server *server, + test_state::Client *client) { + entity_addr_t addr; + addr.parse("127.0.0.1:9010", nullptr); + addr.set_type(entity_addr_t::TYPE_LEGACY); + addr.set_family(AF_INET); + return seastar::when_all_succeed( + server->init(entity_name_t::OSD(4), "server3", 5, addr), + client->init(entity_name_t::OSD(5), "client3", 6)) + .then([server, client] { + return client->msgr->connect(server->msgr->get_myaddr(), + entity_name_t::TYPE_OSD); + }).then([](ceph::net::ConnectionXRef conn) { + // send two messages + (*conn)->send(MessageRef{new MPing, false}); + (*conn)->send(MessageRef{new MPing, false}); + }).then([server] { + server->wait(); + }).finally([client] { + logger().info("client shutdown..."); + return client->msgr->shutdown(); + }).finally([server] { + logger().info("server shutdown..."); + return server->msgr->shutdown(); }); }); } +} + int main(int argc, char** argv) { seastar::app_template app; @@ -198,7 +380,7 @@ int main(int argc, char** argv) "number of pingpong rounds") ("keepalive-ratio", bpo::value()->default_value(0.1), "ratio of keepalive in ping messages"); - return app.run(argc, argv, [&] { + return app.run(argc, argv, [&app] { auto&& config = app.configuration(); verbose = config["verbose"].as(); auto rounds = config["rounds"].as();