From: Yingxin Cheng Date: Sun, 19 Jan 2020 07:23:25 +0000 (+0800) Subject: crimson/net: implement shard-local messenger internally X-Git-Tag: v15.1.1~394^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b2da8b97d0595fdd6aaece8091bc044326730b8e;p=ceph.git crimson/net: implement shard-local messenger internally Adopt FixedCPUServerSocket and don't move sockets across cores after connected/accepted. Implement the messenger to be managed in one CPU only, since we have encapsulated the seastar listen-on-all requirement inside FixedCPUServerSocket, and the requirements of a cross-core messenger has not been defined yet. The messenger interfaces can also be simplified, but will be in another patch. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc index 6992c45e95b0..2b2a6ff2a145 100644 --- a/src/crimson/net/Messenger.cc +++ b/src/crimson/net/Messenger.cc @@ -12,10 +12,15 @@ Messenger::create(const entity_name_t& name, const uint64_t nonce, const int master_sid) { - return create_sharded(name, lname, nonce, master_sid) - .then([](Messenger *msgr) { - return msgr; - }); + // enforce the messenger to a specific core (master_sid) + // TODO: drop the cross-core feature and cleanup the related interfaces in + // the future. + ceph_assert(master_sid >= 0); + return create_sharded( + name, lname, nonce, static_cast(master_sid) + ).then([](Messenger *msgr) { + return msgr; + }); } } // namespace crimson::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index e2cbaeece026..b5c58f43999f 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -33,42 +33,50 @@ namespace crimson::net { SocketMessenger::SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint32_t nonce, - int master_sid) + seastar::shard_id master_sid) : Messenger{myname}, master_sid{master_sid}, - sid{seastar::engine().cpu_id()}, logic_name{logic_name}, nonce{nonce} {} seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) { + assert(seastar::engine().cpu_id() == master_sid); auto my_addrs = addrs; for (auto& addr : my_addrs.v) { addr.nonce = nonce; } - return container().invoke_on_all([my_addrs](auto& msgr) { - return msgr.Messenger::set_myaddrs(my_addrs); - }); + return Messenger::set_myaddrs(my_addrs); } -seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) +seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs) { + assert(seastar::engine().cpu_id() == master_sid); ceph_assert(addrs.front().get_family() == AF_INET); - auto my_addrs = addrs; - for (auto& addr : my_addrs.v) { - addr.nonce = nonce; - } - logger().info("{} listening on {}", *this, my_addrs.front().in4_addr()); - return container().invoke_on_all([my_addrs](auto& msgr) { - msgr.do_bind(my_addrs); - }).handle_exception_type([this] (const std::system_error& e) { - if (e.code() == error::address_in_use) { - throw e; + return set_myaddrs(addrs).then([this] { + if (!listener) { + return FixedCPUServerSocket::create().then([this] (auto _listener) { + listener = _listener; + }); } else { - logger().error("{} bind: unexpected error {}", *this, e); - ceph_abort(); + return seastar::now(); } + }).then([this] { + auto listen_addr = get_myaddr(); + logger().debug("{} do_bind: try listen {}...", *this, listen_addr.in4_addr()); + if (!listener) { + logger().warn("{} do_bind: listener doesn't exist", *this); + return seastar::now(); + } + return listener->listen(listen_addr); + }); +} + +seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) +{ + return do_bind(addrs).then([this] { + logger().info("{} bind: done", *this); }); } @@ -78,221 +86,163 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs, { auto addr = addrs.front(); if (addr.get_port() != 0) { - return bind(addrs); + return do_bind(addrs).then([this] { + logger().info("{} try_bind: done", *this); + }); } ceph_assert(min_port <= max_port); return seastar::do_with(uint32_t(min_port), - [this, max_port, addr] (auto& port) { - return seastar::repeat([this, max_port, addr, &port] { - auto to_bind = addr; - to_bind.set_port(port); - return bind(entity_addrvec_t{to_bind}) - .then([this] { - logger().info("{}: try_bind: done", *this); - return stop_t::yes; - }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { - ceph_assert(e.code() == error::address_in_use); - logger().trace("{}: try_bind: {} already used", *this, port); - if (port == max_port) { - throw e; - } - ++port; - return stop_t::no; - }); - }); + [this, max_port, addr] (auto& port) { + return seastar::repeat([this, max_port, addr, &port] { + auto to_bind = addr; + to_bind.set_port(port); + return do_bind(entity_addrvec_t{to_bind}).then([this] { + logger().info("{} try_bind: done", *this); + return stop_t::yes; + }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { + assert(e.code() == error::address_in_use); + logger().trace("{} try_bind: {} already used", *this, port); + if (port == max_port) { + throw; + } + ++port; + return stop_t::no; + }); }); + }); } seastar::future<> SocketMessenger::start(Dispatcher *disp) { - return container().invoke_on_all([disp](auto& msgr) { - return msgr.do_start(disp->get_local_shard()); - }); -} - -seastar::future -SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) -{ - // make sure we connect to a valid peer_addr - ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2()); - ceph_assert(peer_addr.get_port() > 0); - - auto shard = locate_shard(peer_addr); - return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) { - return msgr.do_connect(peer_addr, peer_type); - }).then([](seastar::foreign_ptr&& conn) { - return seastar::make_lw_shared>(std::move(conn)); - }); -} - -seastar::future<> SocketMessenger::stop() -{ - return do_shutdown(); -} - -seastar::future<> SocketMessenger::shutdown() -{ - return container().invoke_on_all([](auto& msgr) { - return msgr.do_shutdown(); - }).finally([this] { - return container().invoke_on_all([](auto& msgr) { - msgr.shutdown_promise.set_value(); - }); - }); -} - -void SocketMessenger::do_bind(const entity_addrvec_t& addrs) -{ - // safe to discard an immediate ready future - (void) Messenger::set_myaddrs(addrs); - - // TODO: v2: listen on multiple addresses - seastar::socket_address address(addrs.front().in4_addr()); - seastar::listen_options lo; - lo.reuse_address = true; - listener = seastar::listen(address, lo); -} + assert(seastar::engine().cpu_id() == master_sid); -seastar::future<> SocketMessenger::do_start(Dispatcher *disp) -{ - dispatcher = disp; - started = true; - - // start listening if bind() was called + dispatcher = disp->get_local_shard(); if (listener) { // make sure we have already bound to a valid address ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2()); ceph_assert(get_myaddr().get_port() > 0); - // forwarded to accepting_complete - (void) seastar::keep_doing([this] { - return Socket::accept(*listener) - .then([this] (SocketFRef socket, - entity_addr_t peer_addr) { - auto shard = locate_shard(peer_addr); -#warning fixme - // we currently do dangerous i/o from a Connection core, different from the Socket core. - return container().invoke_on(shard, - [sock = std::move(socket), peer_addr, this](auto& msgr) mutable { - SocketConnectionRef conn = seastar::make_shared( - msgr, *msgr.dispatcher, get_myaddr().is_msgr2()); - conn->start_accept(std::move(sock), peer_addr); - }); - }); - }).handle_exception_type([this] (const std::system_error& e) { - // stop gracefully on connection_aborted and invalid_argument - if (e.code() != error::connection_aborted && - e.code() != error::invalid_argument) { - logger().error("{} unexpected error during accept: {}", *this, e); - ceph_abort(); - } - }).handle_exception([this] (auto eptr) { - logger().error("{} unexpected exception during accept: {}", *this, eptr); - ceph_abort(); - }).then([this] () { return accepting_complete.set_value(); }); - } else { - accepting_complete.set_value(); + return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) { + assert(seastar::engine().cpu_id() == master_sid); + SocketConnectionRef conn = seastar::make_shared( + *this, *dispatcher, get_myaddr().is_msgr2()); + // TODO: use SocketRef + conn->start_accept(seastar::make_foreign(std::move(socket)), peer_addr); + return seastar::now(); + }); } return seastar::now(); } -seastar::foreign_ptr -SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +seastar::future +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) { + assert(seastar::engine().cpu_id() == master_sid); + + // make sure we connect to a valid peer_addr + ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2()); + ceph_assert(peer_addr.get_port() > 0); + + // TODO: use ConnectionRef if (auto found = lookup_conn(peer_addr); found) { - return seastar::make_foreign(found->shared_from_this()); + return seastar::make_ready_future( + seastar::make_lw_shared>( + seastar::make_foreign(found->shared_from_this()))); } SocketConnectionRef conn = seastar::make_shared( *this, *dispatcher, peer_addr.is_msgr2()); conn->start_connect(peer_addr, peer_type); - return seastar::make_foreign(conn->shared_from_this()); + return seastar::make_ready_future( + seastar::make_lw_shared>( + seastar::make_foreign(conn->shared_from_this()))); } -seastar::future<> SocketMessenger::do_shutdown() +seastar::future<> SocketMessenger::shutdown() { - if (!started) { - return seastar::now(); - } - - if (listener) { - listener->abort_accept(); - } + assert(seastar::engine().cpu_id() == master_sid); + return seastar::futurize_apply([this] { + if (listener) { + auto d_listener = listener; + listener = nullptr; + return d_listener->destroy(); + } else { + return seastar::now(); + } // close all connections - return seastar::parallel_for_each(accepting_conns, [] (auto conn) { + }).then([this] { + return seastar::parallel_for_each(accepting_conns, [] (auto conn) { return conn->close(); - }).then([this] { - ceph_assert(accepting_conns.empty()); - return seastar::parallel_for_each(connections, [] (auto conn) { - return conn.second->close(); - }); - }).then([this] { - return accepting_complete.get_shared_future(); - }).finally([this] { - ceph_assert(connections.empty()); }); + }).then([this] { + ceph_assert(accepting_conns.empty()); + return seastar::parallel_for_each(connections, [] (auto conn) { + return conn.second->close(); + }); + }).then([this] { + ceph_assert(connections.empty()); + shutdown_promise.set_value(); + }); } seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn) { - // make sure we there's no racing to learn address from peer - return container().invoke_on(0, [peer_addr_for_me, &conn] (auto& msgr) { - if (!msgr.need_addr) { - if ((!msgr.get_myaddr().is_any() && - msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) || - msgr.get_myaddr().get_family() != peer_addr_for_me.get_family() || - !msgr.get_myaddr().is_same_host(peer_addr_for_me)) { - logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}", - conn, peer_addr_for_me, msgr.get_myaddr()); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - return seastar::now(); + assert(seastar::engine().cpu_id() == master_sid); + if (!need_addr) { + if ((!get_myaddr().is_any() && + get_myaddr().get_type() != peer_addr_for_me.get_type()) || + get_myaddr().get_family() != peer_addr_for_me.get_family() || + !get_myaddr().is_same_host(peer_addr_for_me)) { + logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); } - msgr.need_addr = false; - - if (msgr.get_myaddr().get_type() == entity_addr_t::TYPE_NONE) { - // Not bound + return seastar::now(); + } + need_addr = false; + + if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) { + // Not bound + entity_addr_t addr = peer_addr_for_me; + addr.set_type(entity_addr_t::TYPE_ANY); + addr.set_port(0); + return set_myaddrs(entity_addrvec_t{addr} + ).then([this, &conn, peer_addr_for_me] { + logger().info("{} learned myaddr={} (unbound) from {}", + conn, get_myaddr(), peer_addr_for_me); + }); + } else { + // Already bound + if (!get_myaddr().is_any() && + get_myaddr().get_type() != peer_addr_for_me.get_type()) { + logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (get_myaddr().get_family() != peer_addr_for_me.get_family()) { + logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); + } + if (get_myaddr().is_blank_ip()) { entity_addr_t addr = peer_addr_for_me; - addr.set_type(entity_addr_t::TYPE_ANY); - addr.set_port(0); - return msgr.set_myaddrs(entity_addrvec_t{addr} - ).then([&msgr, &conn, peer_addr_for_me] { - logger().info("{} learned myaddr={} (unbound) from {}", - conn, msgr.get_myaddr(), peer_addr_for_me); + addr.set_type(get_myaddr().get_type()); + addr.set_port(get_myaddr().get_port()); + return set_myaddrs(entity_addrvec_t{addr} + ).then([this, &conn, peer_addr_for_me] { + logger().info("{} learned myaddr={} (blank IP) from {}", + conn, get_myaddr(), peer_addr_for_me); }); + } else if (!get_myaddr().is_same_host(peer_addr_for_me)) { + logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}", + conn, peer_addr_for_me, get_myaddr()); + throw std::system_error( + make_error_code(crimson::net::error::bad_peer_address)); } else { - // Already bound - if (!msgr.get_myaddr().is_any() && - msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) { - logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}", - conn, peer_addr_for_me, msgr.get_myaddr()); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - if (msgr.get_myaddr().get_family() != peer_addr_for_me.get_family()) { - logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}", - conn, peer_addr_for_me, msgr.get_myaddr()); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - if (msgr.get_myaddr().is_blank_ip()) { - entity_addr_t addr = peer_addr_for_me; - addr.set_type(msgr.get_myaddr().get_type()); - addr.set_port(msgr.get_myaddr().get_port()); - return msgr.set_myaddrs(entity_addrvec_t{addr} - ).then([&msgr, &conn, peer_addr_for_me] { - logger().info("{} learned myaddr={} (blank IP) from {}", - conn, msgr.get_myaddr(), peer_addr_for_me); - }); - } else if (!msgr.get_myaddr().is_same_host(peer_addr_for_me)) { - logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}", - conn, peer_addr_for_me, msgr.get_myaddr()); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } else { - return seastar::now(); - } + return seastar::now(); } - }); + } } SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const @@ -323,19 +273,6 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type, policy_set.set_throttlers(peer_type, throttle, nullptr); } -seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr) -{ - ceph_assert(addr.get_family() == AF_INET); - if (master_sid >= 0) { - return master_sid; - } - std::size_t seed = 0; - boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr); - //boost::hash_combine(seed, addr.u.sin.sin_port); - //boost::hash_combine(seed, addr.nonce); - return seed % seastar::smp::count; -} - crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) { if (auto found = connections.find(addr); @@ -358,9 +295,6 @@ void SocketMessenger::unaccept_conn(SocketConnectionRef conn) void SocketMessenger::register_conn(SocketConnectionRef conn) { - if (master_sid >= 0) { - ceph_assert(static_cast(sid) == master_sid); - } auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); std::ignore = i; ceph_assert(added); @@ -378,12 +312,10 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn) seastar::future SocketMessenger::get_global_seq(uint32_t old) { - return container().invoke_on(0, [old] (auto& msgr) { - if (old > msgr.global_seq) { - msgr.global_seq = old; - } - return ++msgr.global_seq; - }); + if (old > global_seq) { + global_seq = old; + } + return seastar::make_ready_future(++global_seq); } } // namespace crimson::net diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 2eecc3068590..71ef441e9e9a 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -27,12 +27,13 @@ namespace crimson::net { +class FixedCPUServerSocket; + class SocketMessenger final : public Messenger, public seastar::peering_sharded_service { - const int master_sid; - const seastar::shard_id sid; + const seastar::shard_id master_sid; seastar::promise<> shutdown_promise; - std::optional listener; + FixedCPUServerSocket* listener = nullptr; Dispatcher *dispatcher = nullptr; std::map connections; std::set accepting_conns; @@ -43,30 +44,16 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ // specifying we haven't learned our addr; set false when we find it. bool need_addr = true; uint32_t global_seq = 0; - - seastar::future<> accept(seastar::connected_socket socket, - seastar::socket_address paddr); - - void do_bind(const entity_addrvec_t& addr); - bool started = false; - seastar::shared_promise<> accepting_complete; - seastar::future<> do_start(Dispatcher *disp); - seastar::foreign_ptr do_connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type); - seastar::future<> do_shutdown(); - // conn sharding options: - // 0. Compatible (master_sid >= 0): place all connections to one master shard - // 1. Simplest (master_sid < 0): sharded by ip only - // 2. Balanced (not implemented): sharded by ip + port + nonce, - // but, need to move SocketConnection between cores. - seastar::shard_id locate_shard(const entity_addr_t& addr); + + seastar::future<> do_bind(const entity_addrvec_t& addr); public: SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint32_t nonce, - int master_sid); + seastar::shard_id master_sid); + ~SocketMessenger() override { ceph_assert(!listener); } seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override; @@ -83,6 +70,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ const entity_type_t& peer_type) override; // can only wait once seastar::future<> wait() override { + assert(seastar::engine().cpu_id() == master_sid); return shutdown_promise.get_future(); } @@ -118,12 +106,9 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ void unaccept_conn(SocketConnectionRef); void register_conn(SocketConnectionRef); void unregister_conn(SocketConnectionRef); - - // required by sharded<> - seastar::future<> stop(); - seastar::shard_id shard_id() const { - return sid; + assert(seastar::engine().cpu_id() == master_sid); + return master_sid; } };