From: Yingxin Cheng Date: Wed, 22 Jan 2020 04:31:26 +0000 (+0800) Subject: crimson: implement and adopt shard-local messenger X-Git-Tag: v15.1.1~394^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d00ed495446e233cef14c00ec86566f32f262d8a;p=ceph.git crimson: implement and adopt shard-local messenger Remove the constraints to start messenger as a sharded service, and remove foreign pointers from messenger interfaces. This simplifies users to manage shard-local messenger as a normal object. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index 845c02f256ed..5243338c2779 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -73,8 +73,8 @@ seastar::future<> Client::reconnect() } auto peer = mgrmap.get_active_addrs().front(); return msgr.connect(peer, CEPH_ENTITY_TYPE_MGR).then( - [this](auto xconn) { - conn = xconn->release(); + [this](auto _conn) { + conn = _conn; // ask for the mgrconfigure message auto m = ceph::make_message(); m->daemon_name = local_conf()->name.get_id(); diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 117b9b3ebd8e..4c1790ce0c97 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -943,11 +943,7 @@ seastar::future<> Client::reopen_session(int rank) auto peer = monmap.get_addrs(rank).front(); logger().info("connecting to mon.{}", rank); return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then( - [this] (auto xconn) -> seastar::future { - // sharded-messenger compatible mode assumes all connections running - // in one shard. - ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id()); - crimson::net::ConnectionRef conn = xconn->release(); + [this] (auto conn) -> seastar::future { auto& mc = pending_conns.emplace_back( std::make_unique(auth_registry, conn, &keyring)); if (conn->get_peer_addr().is_msgr2()) { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index f2d8ad74161c..d43d61b699e6 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -110,9 +110,6 @@ class Connection : public seastar::enable_shared_from_this { // will wait for all connections closed virtual seastar::future<> close() = 0; - /// which shard id the connection lives - virtual seastar::shard_id shard_id() const = 0; - virtual void print(ostream& out) const = 0; void set_last_keepalive(clock_t::time_point when) { diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index 304719b811d0..ac608fc43153 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -53,11 +53,6 @@ class Dispatcher { bufferlist&) { return seastar::make_ready_future(0, bufferlist{}); } - - // get the local dispatcher shard if it is accessed by another core - virtual Dispatcher* get_local_shard() { - return this; - } }; } // namespace crimson::net diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 078b61e8b440..222153396724 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -33,29 +33,10 @@ using stop_t = seastar::stop_iteration; class Connection; using ConnectionRef = seastar::shared_ptr; -// NOTE: ConnectionXRef should only be used in seastar world, because -// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads. -using ConnectionXRef = seastar::lw_shared_ptr>; class Dispatcher; class Messenger; - -template -seastar::future create_sharded(Args... args) { - // seems we should only construct/stop shards on #0 - return seastar::smp::submit_to(0, [=] { - auto sharded_obj = seastar::make_lw_shared>(); - return sharded_obj->start(args...).then([sharded_obj]() { - seastar::engine().at_exit([sharded_obj]() { - return sharded_obj->stop().finally([sharded_obj] {}); - }); - return sharded_obj.get(); - }); - }).then([] (seastar::sharded *ptr_shard) { - // return the pointer valid for the caller CPU - return &ptr_shard->local(); - }); -} +using MessengerRef = seastar::shared_ptr; } // namespace crimson::net diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc index 2b2a6ff2a145..aab476f7a4b0 100644 --- a/src/crimson/net/Messenger.cc +++ b/src/crimson/net/Messenger.cc @@ -6,21 +6,12 @@ namespace crimson::net { -seastar::future +MessengerRef Messenger::create(const entity_name_t& name, const std::string& lname, - const uint64_t nonce, - const int master_sid) + const uint64_t nonce) { - // enforce the messenger to a specific core (master_sid) - // TODO: drop the cross-core feature and cleanup the related interfaces in - // the future. - ceph_assert(master_sid >= 0); - return create_sharded( - name, lname, nonce, static_cast(master_sid) - ).then([](Messenger *msgr) { - return msgr; - }); + return seastar::make_shared(name, lname, nonce); } } // namespace crimson::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index f0004365fd76..9f6fcf3658b3 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -76,7 +76,7 @@ public: /// either return an existing connection to the peer, /// or a new pending connection - virtual seastar::future + virtual seastar::future connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) = 0; @@ -106,11 +106,6 @@ public: auth_server = as; } - // get the local messenger shard if it is accessed by another core - virtual Messenger* get_local_shard() { - return this; - } - virtual void print(ostream& out) const = 0; virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0; @@ -131,11 +126,10 @@ public: void set_require_authorizer(bool r) { require_authorizer = r; } - static seastar::future + static MessengerRef create(const entity_name_t& name, const std::string& lname, - const uint64_t nonce, - const int master_sid=-1); + const uint64_t nonce); }; inline ostream& operator<<(ostream& out, const Messenger& msgr) { diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 89d89557ab0b..4a73034e9226 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -30,7 +30,6 @@ SocketConnection::SocketConnection(SocketMessenger& messenger, bool is_msgr2) : messenger(messenger) { - ceph_assert(&messenger.container().local() == &messenger); if (is_msgr2) { protocol = std::make_unique(dispatcher, *this, messenger); } else { @@ -53,14 +52,14 @@ SocketConnection::get_messenger() const { bool SocketConnection::is_connected() const { - ceph_assert(seastar::engine().cpu_id() == shard_id()); + assert(seastar::engine().cpu_id() == shard_id()); return protocol->is_connected(); } #ifdef UNIT_TESTS_BUILT bool SocketConnection::is_closed() const { - ceph_assert(seastar::engine().cpu_id() == shard_id()); + assert(seastar::engine().cpu_id() == shard_id()); return protocol->is_closed(); } @@ -72,23 +71,19 @@ bool SocketConnection::peer_wins() const seastar::future<> SocketConnection::send(MessageRef msg) { - // Cannot send msg from another core now, its ref counter can be contaminated! - ceph_assert(seastar::engine().cpu_id() == shard_id()); - return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { - return protocol->send(std::move(msg)); - }); + assert(seastar::engine().cpu_id() == shard_id()); + return protocol->send(std::move(msg)); } seastar::future<> SocketConnection::keepalive() { - return seastar::smp::submit_to(shard_id(), [this] { - return protocol->keepalive(); - }); + assert(seastar::engine().cpu_id() == shard_id()); + return protocol->keepalive(); } seastar::future<> SocketConnection::close() { - ceph_assert(seastar::engine().cpu_id() == shard_id()); + assert(seastar::engine().cpu_id() == shard_id()); return protocol->close(); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index c6f58c7630e0..503d4e55fb04 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -64,6 +64,8 @@ class SocketConnection : public Connection { // messages sent, but not yet acked by peer std::deque sent; + seastar::shard_id shard_id() const; + public: SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher, @@ -88,8 +90,6 @@ class SocketConnection : public Connection { seastar::future<> close() override; - seastar::shard_id shard_id() const override; - void print(ostream& out) const override; /// start a handshake from the client's perspective, diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index d5c0f5a38efa..75ba6bc70673 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -32,10 +32,9 @@ namespace crimson::net { SocketMessenger::SocketMessenger(const entity_name_t& myname, const std::string& logic_name, - uint32_t nonce, - seastar::shard_id master_sid) + uint32_t nonce) : Messenger{myname}, - master_sid{master_sid}, + master_sid{seastar::engine().cpu_id()}, logic_name{logic_name}, nonce{nonce} {} @@ -115,7 +114,7 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs, seastar::future<> SocketMessenger::start(Dispatcher *disp) { assert(seastar::engine().cpu_id() == master_sid); - dispatcher = disp->get_local_shard(); + dispatcher = disp; if (listener) { // make sure we have already bound to a valid address ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2()); @@ -132,7 +131,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) { return seastar::now(); } -seastar::future +seastar::future SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) { assert(seastar::engine().cpu_id() == master_sid); @@ -141,18 +140,13 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2()); ceph_assert(peer_addr.get_port() > 0); - // TODO: use ConnectionRef if (auto found = lookup_conn(peer_addr); found) { - return seastar::make_ready_future( - seastar::make_lw_shared>( - seastar::make_foreign(found->shared_from_this()))); + return seastar::make_ready_future(found->shared_from_this()); } SocketConnectionRef conn = seastar::make_shared( *this, *dispatcher, peer_addr.is_msgr2()); conn->start_connect(peer_addr, peer_type); - return seastar::make_ready_future( - seastar::make_lw_shared>( - seastar::make_foreign(conn->shared_from_this()))); + return seastar::make_ready_future(conn->shared_from_this()); } seastar::future<> SocketMessenger::shutdown() diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 71ef441e9e9a..cf5fa7d1de20 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -29,7 +29,7 @@ namespace crimson::net { class FixedCPUServerSocket; -class SocketMessenger final : public Messenger, public seastar::peering_sharded_service { +class SocketMessenger final : public Messenger { const seastar::shard_id master_sid; seastar::promise<> shutdown_promise; @@ -51,8 +51,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ public: SocketMessenger(const entity_name_t& myname, const std::string& logic_name, - uint32_t nonce, - seastar::shard_id master_sid); + uint32_t nonce); ~SocketMessenger() override { ceph_assert(!listener); } seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override; @@ -66,8 +65,8 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ seastar::future<> start(Dispatcher *dispatcher) override; - seastar::future connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type) override; + seastar::future connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; // can only wait once seastar::future<> wait() override { assert(seastar::engine().cpu_id() == master_sid); @@ -76,10 +75,6 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ seastar::future<> shutdown() override; - Messenger* get_local_shard() override { - return &container().local(); - } - void print(ostream& out) const override { out << get_myname() << "(" << logic_name diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 12f6be2769b8..92079f3c8a56 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -26,8 +26,8 @@ namespace { Heartbeat::Heartbeat(const crimson::osd::ShardServices& service, crimson::mon::Client& monc, - crimson::net::Messenger& front_msgr, - crimson::net::Messenger& back_msgr) + crimson::net::MessengerRef front_msgr, + crimson::net::MessengerRef back_msgr) : service{service}, monc{monc}, front_msgr{front_msgr}, @@ -46,12 +46,12 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, } using crimson::net::SocketPolicy; - front_msgr.set_policy(entity_name_t::TYPE_OSD, + front_msgr->set_policy(entity_name_t::TYPE_OSD, + SocketPolicy::stateless_server(0)); + back_msgr->set_policy(entity_name_t::TYPE_OSD, SocketPolicy::stateless_server(0)); - back_msgr.set_policy(entity_name_t::TYPE_OSD, - SocketPolicy::stateless_server(0)); - return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs), - start_messenger(back_msgr, back_addrs)) + return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs), + start_messenger(*back_msgr, back_addrs)) .then([this] { timer.arm_periodic( std::chrono::seconds(local_conf()->osd_heartbeat_interval)); @@ -71,25 +71,25 @@ Heartbeat::start_messenger(crimson::net::Messenger& msgr, seastar::future<> Heartbeat::stop() { - return seastar::when_all_succeed(front_msgr.shutdown(), - back_msgr.shutdown()); + return seastar::when_all_succeed(front_msgr->shutdown(), + back_msgr->shutdown()); } const entity_addrvec_t& Heartbeat::get_front_addrs() const { - return front_msgr.get_myaddrs(); + return front_msgr->get_myaddrs(); } const entity_addrvec_t& Heartbeat::get_back_addrs() const { - return back_msgr.get_myaddrs(); + return back_msgr->get_myaddrs(); } void Heartbeat::set_require_authorizer(bool require_authorizer) { - if (front_msgr.get_require_authorizer() != require_authorizer) { - front_msgr.set_require_authorizer(require_authorizer); - back_msgr.set_require_authorizer(require_authorizer); + if (front_msgr->get_require_authorizer() != require_authorizer) { + front_msgr->set_require_authorizer(require_authorizer); + back_msgr->set_require_authorizer(require_authorizer); } } @@ -103,14 +103,13 @@ seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) auto osdmap = service.get_osdmap_service().get_map(); // TODO: use addrs return seastar::when_all_succeed( - front_msgr.connect(osdmap->get_hb_front_addrs(peer).front(), - CEPH_ENTITY_TYPE_OSD), - back_msgr.connect(osdmap->get_hb_back_addrs(peer).front(), - CEPH_ENTITY_TYPE_OSD)) - .then([&info=peer_info->second] (auto xcon_front, auto xcon_back) { - // sharded-messenger compatible mode - info.con_front = xcon_front->release(); - info.con_back = xcon_back->release(); + front_msgr->connect(osdmap->get_hb_front_addrs(peer).front(), + CEPH_ENTITY_TYPE_OSD), + back_msgr->connect(osdmap->get_hb_back_addrs(peer).front(), + CEPH_ENTITY_TYPE_OSD)) + .then([&info=peer_info->second] (auto con_front, auto con_back) { + info.con_front = con_front; + info.con_back = con_back; }); } else { return seastar::now(); diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index d3978da89f2d..b8ff3bcccc68 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -27,8 +27,8 @@ public: Heartbeat(const crimson::osd::ShardServices& service, crimson::mon::Client& monc, - crimson::net::Messenger& front_msgr, - crimson::net::Messenger& back_msgr); + crimson::net::MessengerRef front_msgr, + crimson::net::MessengerRef back_msgr); seastar::future<> start(entity_addrvec_t front, entity_addrvec_t back); @@ -74,8 +74,8 @@ private: private: const crimson::osd::ShardServices& service; crimson::mon::Client& monc; - crimson::net::Messenger& front_msgr; - crimson::net::Messenger& back_msgr; + crimson::net::MessengerRef front_msgr; + crimson::net::MessengerRef back_msgr; seastar::timer timer; // use real_clock so it can be converted to utime_t diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc index 4ffe64546824..370d1e8dfbd6 100644 --- a/src/crimson/osd/main.cc +++ b/src/crimson/osd/main.cc @@ -15,7 +15,7 @@ #include "common/ceph_argparse.h" #include "crimson/common/buffer_io.h" #include "crimson/common/config_proxy.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" #include "global/pidfile.h" #include "osd.h" @@ -129,8 +129,6 @@ int main(int argc, char* argv[]) &cluster_name, &conf_file_list); seastar::sharded osd; - seastar::sharded cluster_msgr, client_msgr; - seastar::sharded hb_front_msgr, hb_back_msgr; using crimson::common::sharded_conf; using crimson::common::sharded_perf_coll; try { @@ -156,30 +154,23 @@ int main(int argc, char* argv[]) pidfile_write(local_conf()->pid_file); const int whoami = std::stoi(local_conf()->name.get_id()); const auto nonce = static_cast(getpid()); + crimson::net::MessengerRef cluster_msgr, client_msgr; + crimson::net::MessengerRef hb_front_msgr, hb_back_msgr; for (auto [msgr, name] : {make_pair(std::ref(cluster_msgr), "cluster"s), make_pair(std::ref(client_msgr), "client"s), make_pair(std::ref(hb_front_msgr), "hb_front"s), make_pair(std::ref(hb_back_msgr), "hb_back"s)}) { - const auto shard = seastar::engine().cpu_id(); - msgr.start(entity_name_t::OSD(whoami), name, nonce, shard).get(); + msgr = crimson::net::Messenger::create(entity_name_t::OSD(whoami), name, nonce); if (local_conf()->ms_crc_data) { - msgr.local().set_crc_data(); + msgr->set_crc_data(); } if (local_conf()->ms_crc_header) { - msgr.local().set_crc_header(); + msgr->set_crc_header(); } } osd.start_single(whoami, nonce, - reference_wrapper(cluster_msgr.local()), - reference_wrapper(client_msgr.local()), - reference_wrapper(hb_front_msgr.local()), - reference_wrapper(hb_back_msgr.local())).get(); - seastar::engine().at_exit([&] { - return seastar::when_all_succeed(cluster_msgr.stop(), - client_msgr.stop(), - hb_front_msgr.stop(), - hb_back_msgr.stop()); - }); + cluster_msgr, client_msgr, + hb_front_msgr, hb_back_msgr).get(); if (config.count("mkkey")) { make_keyring().handle_exception([](std::exception_ptr) { seastar::engine().exit(1); diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 8e49d1222037..59fa5fbcaf4b 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -54,22 +54,22 @@ using crimson::os::FuturizedStore; namespace crimson::osd { OSD::OSD(int id, uint32_t nonce, - crimson::net::Messenger& cluster_msgr, - crimson::net::Messenger& public_msgr, - crimson::net::Messenger& hb_front_msgr, - crimson::net::Messenger& hb_back_msgr) + crimson::net::MessengerRef cluster_msgr, + crimson::net::MessengerRef public_msgr, + crimson::net::MessengerRef hb_front_msgr, + crimson::net::MessengerRef hb_back_msgr) : whoami{id}, nonce{nonce}, // do this in background beacon_timer{[this] { (void)send_beacon(); }}, cluster_msgr{cluster_msgr}, public_msgr{public_msgr}, - monc{new crimson::mon::Client{public_msgr, *this}}, - mgrc{new crimson::mgr::Client{public_msgr, *this}}, + monc{new crimson::mon::Client{*public_msgr, *this}}, + mgrc{new crimson::mgr::Client{*public_msgr, *this}}, store{crimson::os::FuturizedStore::create( local_conf().get_val("osd_objectstore"), local_conf().get_val("osd_data"))}, - shard_services{*this, cluster_msgr, public_msgr, *monc, *mgrc, *store}, + shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store}, heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}}, // do this in background heartbeat_timer{[this] { (void)update_heartbeat_peers(); }}, @@ -78,8 +78,8 @@ OSD::OSD(int id, uint32_t nonce, osdmaps[0] = boost::make_local_shared(); for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr), std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) { - msgr.get().set_auth_server(monc.get()); - msgr.get().set_auth_client(monc.get()); + msgr.get()->set_auth_server(monc.get()); + msgr.get()->set_auth_client(monc.get()); } if (local_conf()->osd_open_classes_on_start) { @@ -224,34 +224,34 @@ seastar::future<> OSD::start() CEPH_FEATURE_OSDENC; using crimson::net::SocketPolicy; - public_msgr.set_default_policy(SocketPolicy::stateless_server(0)); - public_msgr.set_policy(entity_name_t::TYPE_MON, - SocketPolicy::lossy_client(osd_required)); - public_msgr.set_policy(entity_name_t::TYPE_MGR, - SocketPolicy::lossy_client(osd_required)); - public_msgr.set_policy(entity_name_t::TYPE_OSD, - SocketPolicy::stateless_server(0)); - - cluster_msgr.set_default_policy(SocketPolicy::stateless_server(0)); - cluster_msgr.set_policy(entity_name_t::TYPE_MON, - SocketPolicy::lossy_client(0)); - cluster_msgr.set_policy(entity_name_t::TYPE_OSD, - SocketPolicy::lossless_peer(osd_required)); - cluster_msgr.set_policy(entity_name_t::TYPE_CLIENT, + public_msgr->set_default_policy(SocketPolicy::stateless_server(0)); + public_msgr->set_policy(entity_name_t::TYPE_MON, + SocketPolicy::lossy_client(osd_required)); + public_msgr->set_policy(entity_name_t::TYPE_MGR, + SocketPolicy::lossy_client(osd_required)); + public_msgr->set_policy(entity_name_t::TYPE_OSD, SocketPolicy::stateless_server(0)); + cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0)); + cluster_msgr->set_policy(entity_name_t::TYPE_MON, + SocketPolicy::lossy_client(0)); + cluster_msgr->set_policy(entity_name_t::TYPE_OSD, + SocketPolicy::lossless_peer(osd_required)); + cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT, + SocketPolicy::stateless_server(0)); + dispatchers.push_front(this); dispatchers.push_front(monc.get()); dispatchers.push_front(mgrc.get()); return seastar::when_all_succeed( - cluster_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), + cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return cluster_msgr->start(&dispatchers); }), + public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this] { return cluster_msgr.start(&dispatchers); }), - public_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max) - .then([this] { return public_msgr.start(&dispatchers); })); + .then([this] { return public_msgr->start(&dispatchers); })); }).then([this] { return seastar::when_all_succeed(monc->start(), mgrc->start()); @@ -264,15 +264,15 @@ seastar::future<> OSD::start() return monc->renew_subs(); }).then([this] { if (auto [addrs, changed] = - replace_unknown_addrs(cluster_msgr.get_myaddrs(), - public_msgr.get_myaddrs()); changed) { - return cluster_msgr.set_myaddrs(addrs); + replace_unknown_addrs(cluster_msgr->get_myaddrs(), + public_msgr->get_myaddrs()); changed) { + return cluster_msgr->set_myaddrs(addrs); } else { return seastar::now(); } }).then([this] { - return heartbeat->start(public_msgr.get_myaddrs(), - cluster_msgr.get_myaddrs()); + return heartbeat->start(public_msgr->get_myaddrs(), + cluster_msgr->get_myaddrs()); }).then([this] { return start_boot(); }); @@ -325,13 +325,13 @@ seastar::future<> OSD::_send_boot() logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); - logger().info("cluster_msgr: {}", cluster_msgr.get_myaddr()); + logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); auto m = make_message(superblock, osdmap->get_epoch(), osdmap->get_epoch(), heartbeat->get_back_addrs(), heartbeat->get_front_addrs(), - cluster_msgr.get_myaddrs(), + cluster_msgr->get_myaddrs(), CEPH_FEATURES_ALL); return monc->send_message(m); } @@ -402,8 +402,8 @@ seastar::future<> OSD::stop() return monc->stop(); }).then([this] { return when_all_succeed( - public_msgr.shutdown(), - cluster_msgr.shutdown()); + public_msgr->shutdown(), + cluster_msgr->shutdown()); }).then([this] { return store->umount(); }).handle_exception([](auto ep) { @@ -833,7 +833,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, shard_services.update_map(osdmap); if (up_epoch != 0 && osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) { + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { up_epoch = osdmap->get_epoch(); if (!boot_epoch) { boot_epoch = osdmap->get_epoch(); @@ -842,7 +842,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, }); }).then([m, this] { if (osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr.get_myaddrs() && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && bind_epoch < osdmap->get_up_from(whoami)) { if (state.is_booting()) { logger().info("osd.{}: activating...", whoami); @@ -924,17 +924,17 @@ bool OSD::should_restart() const logger().info("map e {} marked osd.{} down", osdmap->get_epoch(), whoami); return true; - } else if (osdmap->get_addrs(whoami) != public_msgr.get_myaddrs()) { + } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { logger().error("map e {} had wrong client addr ({} != my {})", osdmap->get_epoch(), osdmap->get_addrs(whoami), - public_msgr.get_myaddrs()); + public_msgr->get_myaddrs()); return true; - } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr.get_myaddrs()) { + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { logger().error("map e {} had wrong cluster addr ({} != my {})", osdmap->get_epoch(), osdmap->get_cluster_addrs(whoami), - cluster_msgr.get_myaddrs()); + cluster_msgr->get_myaddrs()); return true; } else { return false; diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 570017a8a9bb..95c502636b25 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -68,9 +68,9 @@ class OSD final : public crimson::net::Dispatcher, const uint32_t nonce; seastar::timer beacon_timer; // talk with osd - crimson::net::Messenger& cluster_msgr; + crimson::net::MessengerRef cluster_msgr; // talk with client/mon/mgr - crimson::net::Messenger& public_msgr; + crimson::net::MessengerRef public_msgr; ChainedDispatchers dispatchers; std::unique_ptr monc; std::unique_ptr mgrc; @@ -117,10 +117,10 @@ class OSD final : public crimson::net::Dispatcher, public: OSD(int id, uint32_t nonce, - crimson::net::Messenger& cluster_msgr, - crimson::net::Messenger& client_msgr, - crimson::net::Messenger& hb_front_msgr, - crimson::net::Messenger& hb_back_msgr); + crimson::net::MessengerRef cluster_msgr, + crimson::net::MessengerRef client_msgr, + crimson::net::MessengerRef hb_front_msgr, + crimson::net::MessengerRef hb_back_msgr); ~OSD() final; seastar::future<> mkfs(uuid_d osd_uuid, uuid_d cluster_fsid); diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 35d4f6d614a8..784fca6fa5dc 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -59,8 +59,8 @@ seastar::future<> ShardServices::send_to_osd( } else { return cluster_msgr.connect(osdmap->get_cluster_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD) - .then([m, this] (auto xconn) { - return (*xconn)->send(m); + .then([m, this] (auto conn) { + return conn->send(m); }); } } diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index abccc5034ecc..1a8b2039c56a 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -37,7 +37,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer { struct Server { crimson::thread::Throttle byte_throttler; - crimson::net::Messenger& msgr; + crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; struct ServerDispatcher : crimson::net::Dispatcher { unsigned count = 0; @@ -59,18 +59,18 @@ struct Server { 0, bufferlist{}); } } dispatcher; - Server(crimson::net::Messenger& msgr) + Server(crimson::net::MessengerRef msgr) : byte_throttler(crimson::net::conf.osd_client_message_size_cap), msgr{msgr} { - msgr.set_crc_header(); - msgr.set_crc_data(); + msgr->set_crc_header(); + msgr->set_crc_data(); } }; struct Client { crimson::thread::Throttle byte_throttler; - crimson::net::Messenger& msgr; + crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; struct ClientDispatcher : crimson::net::Dispatcher { unsigned count = 0; @@ -83,12 +83,12 @@ struct Client { return seastar::now(); } } dispatcher; - Client(crimson::net::Messenger& msgr) + Client(crimson::net::MessengerRef msgr) : byte_throttler(crimson::net::conf.osd_client_message_size_cap), msgr{msgr} { - msgr.set_crc_header(); - msgr.set_crc_data(); + msgr->set_crc_header(); + msgr->set_crc_data(); } }; } // namespace seastar_pingpong @@ -151,60 +151,58 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) { std::cout << "seastar/"; if (role == echo_role::as_server) { - return crimson::net::Messenger::create(entity_name_t::OSD(0), "server", - addr.get_nonce(), 0) - .then([addr, count] (auto msgr) { - return seastar::do_with(seastar_pingpong::Server{*msgr}, - [addr, count](auto& server) mutable { - std::cout << "server listening at " << addr << std::endl; - // bind the server - server.msgr.set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, - &server.byte_throttler); - server.msgr.set_require_authorizer(false); - server.msgr.set_auth_client(&server.dummy_auth); - server.msgr.set_auth_server(&server.dummy_auth); - return server.msgr.bind(entity_addrvec_t{addr}) - .then([&server] { - return server.msgr.start(&server.dispatcher); - }).then([&dispatcher=server.dispatcher, count] { - return dispatcher.on_reply.wait([&dispatcher, count] { - return dispatcher.count >= count; - }); - }).finally([&server] { - std::cout << "server shutting down" << std::endl; - return server.msgr.shutdown(); - }); - }); + return seastar::do_with( + seastar_pingpong::Server{crimson::net::Messenger::create( + entity_name_t::OSD(0), "server", addr.get_nonce())}, + [addr, count](auto& server) mutable { + std::cout << "server listening at " << addr << std::endl; + // bind the server + server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, + &server.byte_throttler); + server.msgr->set_require_authorizer(false); + server.msgr->set_auth_client(&server.dummy_auth); + server.msgr->set_auth_server(&server.dummy_auth); + return server.msgr->bind(entity_addrvec_t{addr} + ).then([&server] { + return server.msgr->start(&server.dispatcher); + }).then([&dispatcher=server.dispatcher, count] { + return dispatcher.on_reply.wait([&dispatcher, count] { + return dispatcher.count >= count; + }); + }).finally([&server] { + std::cout << "server shutting down" << std::endl; + return server.msgr->shutdown(); }); + }); } else { - return crimson::net::Messenger::create(entity_name_t::OSD(1), "client", - addr.get_nonce(), 0) - .then([addr, count] (auto msgr) { - return seastar::do_with(seastar_pingpong::Client{*msgr}, - [addr, count](auto& client) { - std::cout << "client sending to " << addr << std::endl; - client.msgr.set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, - &client.byte_throttler); - client.msgr.set_require_authorizer(false); - client.msgr.set_auth_client(&client.dummy_auth); - client.msgr.set_auth_server(&client.dummy_auth); - return client.msgr.start(&client.dispatcher) - .then([addr, &client] { - return client.msgr.connect(addr, entity_name_t::TYPE_OSD); - }).then([&disp=client.dispatcher, count](crimson::net::ConnectionXRef conn) { - return seastar::do_until( - [&disp,count] { return disp.count >= count; }, - [&disp,conn] { return (*conn)->send(make_message()) - .then([&] { return disp.on_reply.wait(); }); - }); - }).finally([&client] { - std::cout << "client shutting down" << std::endl; - return client.msgr.shutdown(); - }); - }); + return seastar::do_with( + seastar_pingpong::Client{crimson::net::Messenger::create( + entity_name_t::OSD(1), "client", addr.get_nonce())}, + [addr, count](auto& client) { + std::cout << "client sending to " << addr << std::endl; + client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, + &client.byte_throttler); + client.msgr->set_require_authorizer(false); + client.msgr->set_auth_client(&client.dummy_auth); + client.msgr->set_auth_server(&client.dummy_auth); + return client.msgr->start(&client.dispatcher).then([addr, &client] { + return client.msgr->connect(addr, entity_name_t::TYPE_OSD); + }).then([&disp=client.dispatcher, count](crimson::net::ConnectionRef conn) { + return seastar::do_until( + [&disp,count] { return disp.count >= count; }, + [&disp,conn] { + return conn->send(make_message()).then([&] { + return disp.on_reply.wait(); + }); + } + ); + }).finally([&client] { + std::cout << "client shutting down" << std::endl; + return client.msgr->shutdown(); }); + }); } } diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 322c4f4b5fdd..ebb391078313 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -43,17 +43,10 @@ static seastar::future<> test_echo(unsigned rounds, { struct test_state { struct Server final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - crimson::net::Messenger *msgr = nullptr; + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::make_ready_future<>(); - } seastar::future<> ms_dispatch(crimson::net::Connection* c, MessageRef m) override { if (verbose) { @@ -67,20 +60,14 @@ static seastar::future<> test_echo(unsigned rounds, const std::string& lname, const uint64_t nonce, const entity_addr_t& addr) { - auto&& fut = crimson::net::Messenger::create(name, lname, nonce, 0); - return fut.then([this, addr](crimson::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& server) { - server.msgr = messenger->get_local_shard(); - server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - server.msgr->set_require_authorizer(false); - server.msgr->set_auth_client(&server.dummy_auth); - server.msgr->set_auth_server(&server.dummy_auth); - }).then([messenger, addr] { - return messenger->bind(entity_addrvec_t{addr}); - }).then([this, messenger] { - return messenger->start(this); - }); - }); + msgr = crimson::net::Messenger::create(name, lname, nonce); + msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + msgr->set_require_authorizer(false); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->start(this); + }); } seastar::future<> shutdown() { ceph_assert(msgr); @@ -89,9 +76,7 @@ static seastar::future<> test_echo(unsigned rounds, }; struct Client final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - + : public crimson::net::Dispatcher { struct PingSession : public seastar::enable_shared_from_this { unsigned count = 0u; mono_time connected_time; @@ -101,7 +86,7 @@ static seastar::future<> test_echo(unsigned rounds, unsigned rounds; std::bernoulli_distribution keepalive_dist; - crimson::net::Messenger *msgr = nullptr; + crimson::net::MessengerRef msgr; std::map> pending_conns; std::map sessions; crimson::auth::DummyAuthClientServer dummy_auth; @@ -118,12 +103,6 @@ static seastar::future<> test_echo(unsigned rounds, return found->second; } - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::now(); - } seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override { auto session = seastar::make_shared(); auto [i, added] = sessions.emplace(conn.get(), session); @@ -143,30 +122,21 @@ static seastar::future<> test_echo(unsigned rounds, if (session->count == rounds) { logger().info("{}: finished receiving {} pongs", *c, session->count); session->finish_time = mono_clock::now(); - return container().invoke_on_all([c](auto &client) { - auto found = client.pending_conns.find(c); - ceph_assert(found != client.pending_conns.end()); - found->second.set_value(); - }); - } else { - return seastar::now(); + auto found = pending_conns.find(c); + ceph_assert(found != pending_conns.end()); + found->second.set_value(); } + return seastar::now(); } seastar::future<> init(const entity_name_t& name, const std::string& lname, const uint64_t nonce) { - return crimson::net::Messenger::create(name, lname, nonce, 0) - .then([this](crimson::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& client) { - client.msgr = messenger->get_local_shard(); - client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - client.msgr->set_auth_client(&client.dummy_auth); - client.msgr->set_auth_server(&client.dummy_auth); - }).then([this, messenger] { - return messenger->start(this); - }); - }); + msgr = crimson::net::Messenger::create(name, lname, nonce); + msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->start(this); } seastar::future<> shutdown() { @@ -174,153 +144,119 @@ static seastar::future<> test_echo(unsigned rounds, return msgr->shutdown(); } - // Note: currently we don't support foreign dispatch a message because: - // 1. it is not effecient because each ref-count modification needs - // a cross-core jump, so it should be discouraged. - // 2. messenger needs to be modified to hold a wrapper for the sending - // message because it can be a nested seastar smart ptr or not. - // 3. in 1:1 mapping OSD, there is no need to do foreign dispatch. - seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, - bool foreign_dispatch) { -#ifndef CRIMSON_MSGR_SEND_FOREIGN - ceph_assert(!foreign_dispatch); -#endif + seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) { mono_time start_time = mono_clock::now(); - return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) - .then([this, foreign_dispatch, start_time](auto conn) { - return seastar::futurize_apply([this, conn, foreign_dispatch] { - if (foreign_dispatch) { - return do_dispatch_pingpong(&**conn); - } else { - // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong(). - return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { - return client.do_dispatch_pingpong(conn); - }); - } - }).finally([this, conn, start_time] { - return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) { - auto session = client.find_session(&**conn); - std::chrono::duration dur_handshake = session->connected_time - start_time; - std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; - logger().info("{}: handshake {}, pingpong {}", - **conn, dur_handshake.count(), dur_pingpong.count()); - }); - }); + return msgr->connect(peer_addr, entity_name_t::TYPE_OSD + ).then([this, start_time](auto conn) { + return seastar::futurize_apply([this, conn] { + return do_dispatch_pingpong(conn.get()); + }).finally([this, conn, start_time] { + auto session = find_session(conn.get()); + std::chrono::duration dur_handshake = session->connected_time - start_time; + std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; + logger().info("{}: handshake {}, pingpong {}", + *conn, dur_handshake.count(), dur_pingpong.count()); }); + }); } private: seastar::future<> do_dispatch_pingpong(crimson::net::Connection* conn) { - return container().invoke_on_all([conn](auto& client) { - auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); - std::ignore = i; - ceph_assert(added); - }).then([this, conn] { - return seastar::do_with(0u, 0u, - [this, conn](auto &count_ping, auto &count_keepalive) { - return seastar::do_until( - [this, conn, &count_ping, &count_keepalive] { - bool stop = (count_ping == rounds); - if (stop) { - logger().info("{}: finished sending {} pings with {} keepalives", - *conn, count_ping, count_keepalive); - } - return stop; - }, - [this, conn, &count_ping, &count_keepalive] { - return seastar::repeat([this, conn, &count_ping, &count_keepalive] { - if (keepalive_dist(rng)) { - return conn->keepalive() - .then([&count_keepalive] { - count_keepalive += 1; - return seastar::make_ready_future( - seastar::stop_iteration::no); - }); - } else { - return conn->send(make_message()) - .then([&count_ping] { - count_ping += 1; - return seastar::make_ready_future( - seastar::stop_iteration::yes); - }); - } + auto [i, added] = pending_conns.emplace(conn, seastar::promise<>()); + std::ignore = i; + ceph_assert(added); + return seastar::do_with(0u, 0u, + [this, conn](auto &count_ping, auto &count_keepalive) { + return seastar::do_until( + [this, conn, &count_ping, &count_keepalive] { + bool stop = (count_ping == rounds); + if (stop) { + logger().info("{}: finished sending {} pings with {} keepalives", + *conn, count_ping, count_keepalive); + } + return stop; + }, + [this, conn, &count_ping, &count_keepalive] { + return seastar::repeat([this, conn, &count_ping, &count_keepalive] { + if (keepalive_dist(rng)) { + return conn->keepalive() + .then([&count_keepalive] { + count_keepalive += 1; + return seastar::make_ready_future( + seastar::stop_iteration::no); }); - }).then([this, conn] { - auto found = pending_conns.find(conn); - return found->second.get_future(); - }); - }); - }); + } else { + return conn->send(make_message()) + .then([&count_ping] { + count_ping += 1; + return seastar::make_ready_future( + seastar::stop_iteration::yes); + }); + } + }); + }).then([this, conn] { + auto found = pending_conns.find(conn); + return found->second.get_future(); + } + ); + }); } }; }; logger().info("test_echo(rounds={}, keepalive_ratio={}, v2={}):", rounds, keepalive_ratio, v2); + auto server1 = seastar::make_shared(); + auto server2 = seastar::make_shared(); + auto client1 = seastar::make_shared(rounds, keepalive_ratio); + auto client2 = seastar::make_shared(rounds, keepalive_ratio); + // start servers and clients + entity_addr_t addr1; + addr1.parse("127.0.0.1:9010", nullptr); + entity_addr_t addr2; + addr2.parse("127.0.0.1:9011", nullptr); + if (v2) { + addr1.set_type(entity_addr_t::TYPE_MSGR2); + addr2.set_type(entity_addr_t::TYPE_MSGR2); + } else { + addr1.set_type(entity_addr_t::TYPE_LEGACY); + addr2.set_type(entity_addr_t::TYPE_LEGACY); + } return seastar::when_all_succeed( - crimson::net::create_sharded(), - crimson::net::create_sharded(), - crimson::net::create_sharded(rounds, keepalive_ratio), - crimson::net::create_sharded(rounds, keepalive_ratio)) - .then([rounds, keepalive_ratio, v2](test_state::Server *server1, - test_state::Server *server2, - test_state::Client *client1, - test_state::Client *client2) { - // start servers and clients - entity_addr_t addr1; - addr1.parse("127.0.0.1:9010", nullptr); - entity_addr_t addr2; - addr2.parse("127.0.0.1:9011", nullptr); - if (v2) { - addr1.set_type(entity_addr_t::TYPE_MSGR2); - addr2.set_type(entity_addr_t::TYPE_MSGR2); - } else { - addr1.set_type(entity_addr_t::TYPE_LEGACY); - addr2.set_type(entity_addr_t::TYPE_LEGACY); - } - return seastar::when_all_succeed( - server1->init(entity_name_t::OSD(0), "server1", 1, addr1), - server2->init(entity_name_t::OSD(1), "server2", 2, addr2), - client1->init(entity_name_t::OSD(2), "client1", 3), - client2->init(entity_name_t::OSD(3), "client2", 4)) - // dispatch pingpoing - .then([client1, client2, server1, server2] { - return seastar::when_all_succeed( - // test connecting in parallel, accepting in parallel -#ifdef CRIMSON_MSGR_SEND_FOREIGN - // operate the connection reference from a foreign core - client1->dispatch_pingpong(server1->msgr->get_myaddr(), true), - client2->dispatch_pingpong(server2->msgr->get_myaddr(), true), -#endif - // operate the connection reference from a local core - client1->dispatch_pingpong(server2->msgr->get_myaddr(), false), - client2->dispatch_pingpong(server1->msgr->get_myaddr(), false)); - // shutdown - }).finally([client1] { - logger().info("client1 shutdown..."); - return client1->shutdown(); - }).finally([client2] { - logger().info("client2 shutdown..."); - return client2->shutdown(); - }).finally([server1] { - logger().info("server1 shutdown..."); - return server1->shutdown(); - }).finally([server2] { - logger().info("server2 shutdown..."); - return server2->shutdown(); - }).finally([] { - logger().info("test_echo() done!\n"); - }); - }); + server1->init(entity_name_t::OSD(0), "server1", 1, addr1), + server2->init(entity_name_t::OSD(1), "server2", 2, addr2), + client1->init(entity_name_t::OSD(2), "client1", 3), + client2->init(entity_name_t::OSD(3), "client2", 4) + // dispatch pingpoing + ).then([client1, client2, server1, server2] { + return seastar::when_all_succeed( + // test connecting in parallel, accepting in parallel + client1->dispatch_pingpong(server2->msgr->get_myaddr()), + client2->dispatch_pingpong(server1->msgr->get_myaddr())); + // shutdown + }).finally([client1] { + logger().info("client1 shutdown..."); + return client1->shutdown(); + }).finally([client2] { + logger().info("client2 shutdown..."); + return client2->shutdown(); + }).finally([server1] { + logger().info("server1 shutdown..."); + return server1->shutdown(); + }).finally([server2] { + logger().info("server2 shutdown..."); + return server2->shutdown(); + }).finally([server1, server2, client1, client2] { + logger().info("test_echo() done!\n"); + }); } static seastar::future<> test_concurrent_dispatch(bool v2) { struct test_state { struct Server final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - crimson::net::Messenger *msgr = nullptr; + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; int count = 0; seastar::promise<> on_second; // satisfied on second dispatch seastar::promise<> on_done; // satisfied when first dispatch unblocks @@ -331,12 +267,9 @@ static seastar::future<> test_concurrent_dispatch(bool v2) switch (++count) { case 1: // block on the first request until we reenter with the second - return on_second.get_future() - .then([this] { - return container().invoke_on_all([](Server& server) { - server.on_done.set_value(); - }); - }); + return on_second.get_future().then([this] { + on_done.set_value(); + }); case 2: on_second.set_value(); return seastar::now(); @@ -351,105 +284,73 @@ static seastar::future<> test_concurrent_dispatch(bool v2) const std::string& lname, const uint64_t nonce, const entity_addr_t& addr) { - return crimson::net::Messenger::create(name, lname, nonce, 0) - .then([this, addr](crimson::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& server) { - server.msgr = messenger->get_local_shard(); - server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - server.msgr->set_auth_client(&server.dummy_auth); - server.msgr->set_auth_server(&server.dummy_auth); - }).then([messenger, addr] { - return messenger->bind(entity_addrvec_t{addr}); - }).then([this, messenger] { - return messenger->start(this); - }); - }); - } - - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::make_ready_future<>(); + msgr = crimson::net::Messenger::create(name, lname, nonce); + msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->start(this); + }); } }; struct Client final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - crimson::net::Messenger *msgr = nullptr; + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; seastar::future<> init(const entity_name_t& name, const std::string& lname, const uint64_t nonce) { - return crimson::net::Messenger::create(name, lname, nonce, 0) - .then([this](crimson::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& client) { - client.msgr = messenger->get_local_shard(); - client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - client.msgr->set_auth_client(&client.dummy_auth); - client.msgr->set_auth_server(&client.dummy_auth); - }).then([this, messenger] { - return messenger->start(this); - }); - }); - } - - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::make_ready_future<>(); + msgr = crimson::net::Messenger::create(name, lname, nonce); + msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->start(this); } }; }; logger().info("test_concurrent_dispatch(v2={}):", v2); + auto server = seastar::make_shared(); + auto client = seastar::make_shared(); + entity_addr_t addr; + addr.parse("127.0.0.1:9010", nullptr); + if (v2) { + addr.set_type(entity_addr_t::TYPE_MSGR2); + } else { + addr.set_type(entity_addr_t::TYPE_LEGACY); + } + addr.set_family(AF_INET); return seastar::when_all_succeed( - crimson::net::create_sharded(), - crimson::net::create_sharded()) - .then([v2](test_state::Server *server, - test_state::Client *client) { - entity_addr_t addr; - addr.parse("127.0.0.1:9010", nullptr); - if (v2) { - addr.set_type(entity_addr_t::TYPE_MSGR2); - } else { - addr.set_type(entity_addr_t::TYPE_LEGACY); - } - addr.set_family(AF_INET); - return seastar::when_all_succeed( - server->init(entity_name_t::OSD(4), "server3", 5, addr), - client->init(entity_name_t::OSD(5), "client3", 6)) - .then([server, client] { - return client->msgr->connect(server->msgr->get_myaddr(), - entity_name_t::TYPE_OSD); - }).then([](crimson::net::ConnectionXRef conn) { - // send two messages - return (*conn)->send(make_message()).then([conn] { - return (*conn)->send(make_message()); - }); - }).then([server] { - return server->wait(); - }).finally([client] { - logger().info("client shutdown..."); - return client->msgr->shutdown(); - }).finally([server] { - logger().info("server shutdown..."); - return server->msgr->shutdown(); - }).finally([] { - logger().info("test_concurrent_dispatch() done!\n"); - }); - }); + server->init(entity_name_t::OSD(4), "server3", 5, addr), + client->init(entity_name_t::OSD(5), "client3", 6) + ).then([server, client] { + return client->msgr->connect(server->msgr->get_myaddr(), + entity_name_t::TYPE_OSD); + }).then([](crimson::net::ConnectionRef conn) { + // send two messages + return conn->send(make_message()).then([conn] { + return conn->send(make_message()); + }); + }).then([server] { + return server->wait(); + }).finally([client] { + logger().info("client shutdown..."); + return client->msgr->shutdown(); + }).finally([server] { + logger().info("server shutdown..."); + return server->msgr->shutdown(); + }).finally([server, client] { + logger().info("test_concurrent_dispatch() done!\n"); + }); } seastar::future<> test_preemptive_shutdown(bool v2) { struct test_state { class Server final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - crimson::net::Messenger *msgr = nullptr; + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; seastar::future<> ms_dispatch(crimson::net::Connection* c, @@ -462,18 +363,12 @@ seastar::future<> test_preemptive_shutdown(bool v2) { const std::string& lname, const uint64_t nonce, const entity_addr_t& addr) { - return crimson::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id() - ).then([this, addr](crimson::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& server) { - server.msgr = messenger->get_local_shard(); - server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - server.msgr->set_auth_client(&server.dummy_auth); - server.msgr->set_auth_server(&server.dummy_auth); - }).then([messenger, addr] { - return messenger->bind(entity_addrvec_t{addr}); - }).then([this, messenger] { - return messenger->start(this); - }); + msgr = crimson::net::Messenger::create(name, lname, nonce); + msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->start(this); }); } entity_addr_t get_addr() const { @@ -482,18 +377,11 @@ seastar::future<> test_preemptive_shutdown(bool v2) { seastar::future<> shutdown() { return msgr->shutdown(); } - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::now(); - } }; class Client final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - crimson::net::Messenger *msgr = nullptr; + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; bool stop_send = false; @@ -508,25 +396,19 @@ seastar::future<> test_preemptive_shutdown(bool v2) { seastar::future<> init(const entity_name_t& name, const std::string& lname, const uint64_t nonce) { - return crimson::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id() - ).then([this](crimson::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& client) { - client.msgr = messenger->get_local_shard(); - client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - client.msgr->set_auth_client(&client.dummy_auth); - client.msgr->set_auth_server(&client.dummy_auth); - }).then([this, messenger] { - return messenger->start(this); - }); - }); + msgr = crimson::net::Messenger::create(name, lname, nonce); + msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->start(this); } seastar::future<> send_pings(const entity_addr_t& addr) { return msgr->connect(addr, entity_name_t::TYPE_OSD - ).then([this](crimson::net::ConnectionXRef conn) { + ).then([this](crimson::net::ConnectionRef conn) { // forwarded to stopped_send_promise (void) seastar::do_until( [this] { return stop_send; }, - [this, conn = &**conn] { + [this, conn] { return conn->send(make_message()).then([] { return seastar::sleep(0ms); }); @@ -542,45 +424,35 @@ seastar::future<> test_preemptive_shutdown(bool v2) { return stopped_send_promise.get_future(); }); } - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::now(); - } }; }; logger().info("test_preemptive_shutdown(v2={}):", v2); + auto server = seastar::make_shared(); + auto client = seastar::make_shared(); + entity_addr_t addr; + addr.parse("127.0.0.1:9010", nullptr); + if (v2) { + addr.set_type(entity_addr_t::TYPE_MSGR2); + } else { + addr.set_type(entity_addr_t::TYPE_LEGACY); + } + addr.set_family(AF_INET); return seastar::when_all_succeed( - crimson::net::create_sharded(), - crimson::net::create_sharded() - ).then([v2](test_state::Server *server, - test_state::Client *client) { - entity_addr_t addr; - addr.parse("127.0.0.1:9010", nullptr); - if (v2) { - addr.set_type(entity_addr_t::TYPE_MSGR2); - } else { - addr.set_type(entity_addr_t::TYPE_LEGACY); - } - addr.set_family(AF_INET); - return seastar::when_all_succeed( - server->init(entity_name_t::OSD(6), "server4", 7, addr), - client->init(entity_name_t::OSD(7), "client4", 8) - ).then([server, client] { - return client->send_pings(server->get_addr()); - }).then([] { - return seastar::sleep(100ms); - }).then([client] { - logger().info("client shutdown..."); - return client->shutdown(); - }).finally([server] { - logger().info("server shutdown..."); - return server->shutdown(); - }).finally([] { - logger().info("test_preemptive_shutdown() done!\n"); - }); + server->init(entity_name_t::OSD(6), "server4", 7, addr), + client->init(entity_name_t::OSD(7), "client4", 8) + ).then([server, client] { + return client->send_pings(server->get_addr()); + }).then([] { + return seastar::sleep(100ms); + }).then([client] { + logger().info("client shutdown..."); + return client->shutdown(); + }).finally([server] { + logger().info("server shutdown..."); + return server->shutdown(); + }).finally([server, client] { + logger().info("test_preemptive_shutdown() done!\n"); }); } @@ -594,6 +466,7 @@ using crimson::net::custom_bp_t; using crimson::net::Dispatcher; using crimson::net::Interceptor; using crimson::net::Messenger; +using crimson::net::MessengerRef; using crimson::net::SocketPolicy; using crimson::net::tag_bp_t; using ceph::net::test::cmd_t; @@ -901,7 +774,7 @@ SocketPolicy to_socket_policy(policy_t policy) { class FailoverSuite : public Dispatcher { crimson::auth::DummyAuthClientServer dummy_auth; - Messenger& test_msgr; + MessengerRef test_msgr; const entity_addr_t test_peer_addr; TestInterceptor interceptor; @@ -1023,12 +896,12 @@ class FailoverSuite : public Dispatcher { private: seastar::future<> init(entity_addr_t addr, SocketPolicy policy) { - test_msgr.set_default_policy(policy); - test_msgr.set_auth_client(&dummy_auth); - test_msgr.set_auth_server(&dummy_auth); - test_msgr.interceptor = &interceptor; - return test_msgr.bind(entity_addrvec_t{addr}).then([this] { - return test_msgr.start(this); + test_msgr->set_default_policy(policy); + test_msgr->set_auth_client(&dummy_auth); + test_msgr->set_auth_server(&dummy_auth); + test_msgr->interceptor = &interceptor; + return test_msgr->bind(entity_addrvec_t{addr}).then([this] { + return test_msgr->start(this); }); } @@ -1137,7 +1010,7 @@ class FailoverSuite : public Dispatcher { // called by FailoverTest public: - FailoverSuite(Messenger& test_msgr, + FailoverSuite(MessengerRef test_msgr, entity_addr_t test_peer_addr, const TestInterceptor& interceptor) : test_msgr(test_msgr), @@ -1145,11 +1018,11 @@ class FailoverSuite : public Dispatcher { interceptor(interceptor) { } entity_addr_t get_addr() const { - return test_msgr.get_myaddr(); + return test_msgr->get_myaddr(); } seastar::future<> shutdown() { - return test_msgr.shutdown(); + return test_msgr->shutdown(); } void needs_receive() { @@ -1193,17 +1066,12 @@ class FailoverSuite : public Dispatcher { SocketPolicy test_policy, entity_addr_t test_peer_addr, const TestInterceptor& interceptor) { - return Messenger::create(entity_name_t::OSD(2), "Test", 2, 0 - ).then([test_addr, - test_policy, - test_peer_addr, - interceptor] (Messenger* test_msgr) { - auto suite = std::make_unique( - *test_msgr, test_peer_addr, interceptor); - return suite->init(test_addr, test_policy - ).then([suite = std::move(suite)] () mutable { - return std::move(suite); - }); + auto suite = std::make_unique( + Messenger::create(entity_name_t::OSD(2), "Test", 2), + test_peer_addr, interceptor); + return suite->init(test_addr, test_policy + ).then([suite = std::move(suite)] () mutable { + return std::move(suite); }); } @@ -1211,9 +1079,8 @@ class FailoverSuite : public Dispatcher { public: seastar::future<> connect_peer() { logger().info("[Test] connect_peer({})", test_peer_addr); - return test_msgr.connect(test_peer_addr, entity_name_t::TYPE_OSD - ).then([this] (auto xconn) { - auto conn = xconn->release(); + return test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD + ).then([this] (auto conn) { auto result = interceptor.find_result(conn); ceph_assert(result != nullptr); @@ -1302,7 +1169,7 @@ class FailoverSuite : public Dispatcher { class FailoverTest : public Dispatcher { crimson::auth::DummyAuthClientServer dummy_auth; - Messenger& cmd_msgr; + MessengerRef cmd_msgr; ConnectionRef cmd_conn; const entity_addr_t test_addr; const entity_addr_t test_peer_addr; @@ -1375,20 +1242,20 @@ class FailoverTest : public Dispatcher { } seastar::future<> init(entity_addr_t cmd_peer_addr) { - cmd_msgr.set_default_policy(SocketPolicy::lossy_client(0)); - cmd_msgr.set_auth_client(&dummy_auth); - cmd_msgr.set_auth_server(&dummy_auth); - return cmd_msgr.start(this).then([this, cmd_peer_addr] { + cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0)); + cmd_msgr->set_auth_client(&dummy_auth); + cmd_msgr->set_auth_server(&dummy_auth); + return cmd_msgr->start(this).then([this, cmd_peer_addr] { logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr); - return cmd_msgr.connect(cmd_peer_addr, entity_name_t::TYPE_OSD); + return cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD); }).then([this] (auto conn) { - cmd_conn = conn->release(); + cmd_conn = conn; return pingpong(); }); } public: - FailoverTest(Messenger& cmd_msgr, + FailoverTest(MessengerRef cmd_msgr, entity_addr_t test_addr, entity_addr_t test_peer_addr) : cmd_msgr(cmd_msgr), @@ -1403,25 +1270,23 @@ class FailoverTest : public Dispatcher { return cmd_conn->send(m).then([this] { return seastar::sleep(200ms); }).finally([this] { - return cmd_msgr.shutdown(); + return cmd_msgr->shutdown(); }); } static seastar::future> create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) { - return Messenger::create(entity_name_t::OSD(1), "CmdCli", 1, 0 - ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) mutable { - test_addr.set_nonce(2); - cmd_peer_addr.set_nonce(3); - entity_addr_t test_peer_addr = cmd_peer_addr; - test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); - test_peer_addr.set_nonce(4); - auto test = seastar::make_lw_shared( - *cmd_msgr, test_addr, test_peer_addr); - return test->init(cmd_peer_addr).then([test] { - logger().info("CmdCli ready"); - return test; - }); + test_addr.set_nonce(2); + cmd_peer_addr.set_nonce(3); + entity_addr_t test_peer_addr = cmd_peer_addr; + test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); + test_peer_addr.set_nonce(4); + auto test = seastar::make_lw_shared( + Messenger::create(entity_name_t::OSD(1), "CmdCli", 1), + test_addr, test_peer_addr); + return test->init(cmd_peer_addr).then([test] { + logger().info("CmdCli ready"); + return test; }); } @@ -1506,7 +1371,7 @@ class FailoverTest : public Dispatcher { class FailoverSuitePeer : public Dispatcher { using cb_t = std::function()>; crimson::auth::DummyAuthClientServer dummy_auth; - Messenger& peer_msgr; + MessengerRef peer_msgr; cb_t op_callback; ConnectionRef tracked_conn; @@ -1535,11 +1400,11 @@ class FailoverSuitePeer : public Dispatcher { private: seastar::future<> init(entity_addr_t addr, SocketPolicy policy) { - peer_msgr.set_default_policy(policy); - peer_msgr.set_auth_client(&dummy_auth); - peer_msgr.set_auth_server(&dummy_auth); - return peer_msgr.bind(entity_addrvec_t{addr}).then([this] { - return peer_msgr.start(this); + peer_msgr->set_default_policy(policy); + peer_msgr->set_auth_client(&dummy_auth); + peer_msgr->set_auth_server(&dummy_auth); + return peer_msgr->bind(entity_addrvec_t{addr}).then([this] { + return peer_msgr->start(this); }); } @@ -1567,18 +1432,18 @@ class FailoverSuitePeer : public Dispatcher { } public: - FailoverSuitePeer(Messenger& peer_msgr, cb_t op_callback) + FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback) : peer_msgr(peer_msgr), op_callback(op_callback) { } seastar::future<> shutdown() { - return peer_msgr.shutdown(); + return peer_msgr->shutdown(); } seastar::future<> connect_peer(entity_addr_t addr) { logger().info("[TestPeer] connect_peer({})", addr); - return peer_msgr.connect(addr, entity_name_t::TYPE_OSD - ).then([this] (auto xconn) { - auto new_tracked_conn = xconn->release(); + return peer_msgr->connect(addr, entity_name_t::TYPE_OSD + ).then([this] (auto conn) { + auto new_tracked_conn = conn; if (tracked_conn) { if (tracked_conn->is_closed()) { ceph_assert(tracked_conn != new_tracked_conn); @@ -1621,20 +1486,18 @@ class FailoverSuitePeer : public Dispatcher { static seastar::future> create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) { - return Messenger::create(entity_name_t::OSD(4), "TestPeer", 4, 0 - ).then([addr, policy, op_callback] (Messenger* peer_msgr) { - auto suite = std::make_unique(*peer_msgr, op_callback); - return suite->init(addr, policy - ).then([suite = std::move(suite)] () mutable { - return std::move(suite); - }); + auto suite = std::make_unique( + Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback); + return suite->init(addr, policy + ).then([suite = std::move(suite)] () mutable { + return std::move(suite); }); } }; class FailoverTestPeer : public Dispatcher { crimson::auth::DummyAuthClientServer dummy_auth; - Messenger& cmd_msgr; + MessengerRef cmd_msgr; ConnectionRef cmd_conn; const entity_addr_t test_peer_addr; std::unique_ptr test_suite; @@ -1650,7 +1513,7 @@ class FailoverTestPeer : public Dispatcher { if (cmd == cmd_t::shutdown) { logger().info("CmdSrv shutdown..."); // forwarded to FailoverTestPeer::wait() - (void) cmd_msgr.shutdown(); + (void) cmd_msgr->shutdown(); return seastar::now(); } return handle_cmd(cmd, m_cmd).then([c] { @@ -1715,37 +1578,35 @@ class FailoverTestPeer : public Dispatcher { } seastar::future<> init(entity_addr_t cmd_peer_addr) { - cmd_msgr.set_default_policy(SocketPolicy::stateless_server(0)); - cmd_msgr.set_auth_client(&dummy_auth); - cmd_msgr.set_auth_server(&dummy_auth); - return cmd_msgr.bind(entity_addrvec_t{cmd_peer_addr}).then([this] { - return cmd_msgr.start(this); + cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0)); + cmd_msgr->set_auth_client(&dummy_auth); + cmd_msgr->set_auth_server(&dummy_auth); + return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] { + return cmd_msgr->start(this); }); } public: - FailoverTestPeer(Messenger& cmd_msgr, + FailoverTestPeer(MessengerRef cmd_msgr, entity_addr_t test_peer_addr) : cmd_msgr(cmd_msgr), test_peer_addr(test_peer_addr) { } seastar::future<> wait() { - return cmd_msgr.wait(); + return cmd_msgr->wait(); } static seastar::future> create(entity_addr_t cmd_peer_addr) { - return Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3, 0 - ).then([cmd_peer_addr] (Messenger* cmd_msgr) { - // suite bind to cmd_peer_addr, with port + 1 - entity_addr_t test_peer_addr = cmd_peer_addr; - test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); - auto test_peer = std::make_unique(*cmd_msgr, test_peer_addr); - return test_peer->init(cmd_peer_addr - ).then([test_peer = std::move(test_peer)] () mutable { - logger().info("CmdSrv ready"); - return std::move(test_peer); - }); + // suite bind to cmd_peer_addr, with port + 1 + entity_addr_t test_peer_addr = cmd_peer_addr; + test_peer_addr.set_port(cmd_peer_addr.get_port() + 1); + auto test_peer = std::make_unique( + Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr); + return test_peer->init(cmd_peer_addr + ).then([test_peer = std::move(test_peer)] () mutable { + logger().info("CmdSrv ready"); + return std::move(test_peer); }); } }; diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index a733d7de864c..1275b451ebe7 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -39,29 +39,26 @@ static seastar::future<> test_monc() }).then([] { return crimson::common::sharded_perf_coll().start(); }).then([] { - return crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0, - seastar::engine().cpu_id()) - .then([] (crimson::net::Messenger *msgr) { - auto& conf = crimson::common::local_conf(); - if (conf->ms_crc_data) { - msgr->set_crc_data(); - } - if (conf->ms_crc_header) { - msgr->set_crc_header(); - } - msgr->set_require_authorizer(false); - return seastar::do_with(MonClient{*msgr, dummy_handler}, - [msgr](auto& monc) { - return msgr->start(&monc).then([&monc] { - return seastar::with_timeout( - seastar::lowres_clock::now() + std::chrono::seconds{10}, - monc.start()); - }).then([&monc] { - return monc.stop(); - }); - }).finally([msgr] { - return msgr->shutdown(); + auto msgr = crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0); + auto& conf = crimson::common::local_conf(); + if (conf->ms_crc_data) { + msgr->set_crc_data(); + } + if (conf->ms_crc_header) { + msgr->set_crc_header(); + } + msgr->set_require_authorizer(false); + return seastar::do_with(MonClient{*msgr, dummy_handler}, + [msgr](auto& monc) { + return msgr->start(&monc).then([&monc] { + return seastar::with_timeout( + seastar::lowres_clock::now() + std::chrono::seconds{10}, + monc.start()); + }).then([&monc] { + return monc.stop(); }); + }).finally([msgr] { + return msgr->shutdown(); }); }).finally([] { return crimson::common::sharded_perf_coll().stop().then([] { diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc index e66f8702fb39..0c24f3b24c8b 100644 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include "common/ceph_time.h" #include "messages/MOSDOp.h" @@ -32,6 +33,23 @@ seastar::logger& logger() { return crimson::get_logger(ceph_subsys_ms); } +template +seastar::future create_sharded(Args... args) { + // seems we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [=] { + auto sharded_obj = seastar::make_lw_shared>(); + return sharded_obj->start(args...).then([sharded_obj]() { + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().finally([sharded_obj] {}); + }); + return sharded_obj.get(); + }); + }).then([] (seastar::sharded *ptr_shard) { + // return the pointer valid for the caller CPU + return &ptr_shard->local(); + }); +} + enum class perf_mode_t { both, client, @@ -114,32 +132,26 @@ static seastar::future<> run( const server_config& server_conf) { struct test_state { + struct Server; + using ServerFRef = seastar::foreign_ptr>; + struct Server final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - crimson::net::Messenger *msgr = nullptr; + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; - const seastar::shard_id sid; const seastar::shard_id msgr_sid; std::string lname; unsigned msg_len; bufferlist msg_data; - Server(unsigned msgr_core, unsigned msg_len) - : sid{seastar::engine().cpu_id()}, - msgr_sid{msgr_core}, + Server(unsigned msg_len) + : msgr_sid{seastar::engine().cpu_id()}, msg_len{msg_len} { lname = "server#"; - lname += std::to_string(sid); + lname += std::to_string(msgr_sid); msg_data.append_zero(msg_len); } - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::make_ready_future<>(); - } seastar::future<> ms_dispatch(crimson::net::Connection* c, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); @@ -158,34 +170,38 @@ static seastar::future<> run( } seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) { - return container().invoke_on(msgr_sid, [v1_crc_enabled, addr] (auto& server) { + return seastar::smp::submit_to(msgr_sid, [v1_crc_enabled, addr, this] { // server msgr is always with nonce 0 - auto&& fut = crimson::net::Messenger::create(entity_name_t::OSD(server.sid), server.lname, 0, server.sid); - return fut.then( - [&server, addr, v1_crc_enabled](crimson::net::Messenger *messenger) { - return server.container().invoke_on_all( - [messenger, v1_crc_enabled](auto& server) { - server.msgr = messenger->get_local_shard(); - server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - server.msgr->set_auth_client(&server.dummy_auth); - server.msgr->set_auth_server(&server.dummy_auth); - if (v1_crc_enabled) { - server.msgr->set_crc_header(); - server.msgr->set_crc_data(); - } - }).then([messenger, addr] { - return messenger->bind(entity_addrvec_t{addr}); - }).then([&server, messenger] { - return messenger->start(&server); - }); - }); + msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0); + msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + if (v1_crc_enabled) { + msgr->set_crc_header(); + msgr->set_crc_data(); + } + return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->start(this); + }); }); } seastar::future<> shutdown() { logger().info("{} shutdown...", lname); - return container().invoke_on(msgr_sid, [] (auto& server) { - ceph_assert(server.msgr); - return server.msgr->shutdown(); + return seastar::smp::submit_to(msgr_sid, [this] { + ceph_assert(msgr); + return msgr->shutdown(); + }); + } + seastar::future<> wait() { + return seastar::smp::submit_to(msgr_sid, [this] { + ceph_assert(msgr); + return msgr->wait(); + }); + } + + static seastar::future create(seastar::shard_id msgr_sid, unsigned msg_len) { + return seastar::smp::submit_to(msgr_sid, [msg_len] { + return seastar::make_foreign(std::make_unique(msg_len)); }); } }; @@ -250,7 +266,7 @@ static seastar::future<> run( std::string lname; const unsigned jobs; - crimson::net::Messenger *msgr = nullptr; + crimson::net::MessengerRef msgr; const unsigned msg_len; bufferlist msg_data; const unsigned nr_depth; @@ -281,12 +297,6 @@ static seastar::future<> run( return nr_depth - depth.current(); } - Dispatcher* get_local_shard() override { - return &(container().local()); - } - seastar::future<> stop() { - return seastar::now(); - } seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override { conn_stats.connected_time = mono_clock::now(); return seastar::now(); @@ -323,19 +333,16 @@ static seastar::future<> run( seastar::future<> init(bool v1_crc_enabled) { return container().invoke_on_all([v1_crc_enabled] (auto& client) { if (client.is_active()) { - return crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid, client.sid) - .then([&client, v1_crc_enabled] (crimson::net::Messenger *messenger) { - client.msgr = messenger; - client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - client.msgr->set_require_authorizer(false); - client.msgr->set_auth_client(&client.dummy_auth); - client.msgr->set_auth_server(&client.dummy_auth); - if (v1_crc_enabled) { - client.msgr->set_crc_header(); - client.msgr->set_crc_data(); - } - return client.msgr->start(&client); - }); + client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid); + client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + client.msgr->set_require_authorizer(false); + client.msgr->set_auth_client(&client.dummy_auth); + client.msgr->set_auth_server(&client.dummy_auth); + if (v1_crc_enabled) { + client.msgr->set_crc_header(); + client.msgr->set_crc_data(); + } + return client.msgr->start(&client); } return seastar::now(); }); @@ -359,9 +366,9 @@ static seastar::future<> run( // start clients in active cores (#1 ~ #jobs) if (client.is_active()) { mono_time start_time = mono_clock::now(); - return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD) - .then([&client] (auto conn) { - client.active_conn = conn->release(); + return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD + ).then([&client] (auto conn) { + client.active_conn = conn; // make sure handshake won't hurt the performance return seastar::sleep(1s); }).then([&client, start_time] { @@ -638,57 +645,57 @@ static seastar::future<> run( }; return seastar::when_all_succeed( - crimson::net::create_sharded(server_conf.core, server_conf.block_size), - crimson::net::create_sharded(client_conf.jobs, - client_conf.block_size, client_conf.depth)) - .then([=](test_state::Server *server, - test_state::Client *client) { - if (mode == perf_mode_t::both) { - logger().info("\nperf settings:\n {}\n {}\n", - client_conf.str(), server_conf.str()); - ceph_assert(seastar::smp::count >= 1+client_conf.jobs); - ceph_assert(client_conf.jobs > 0); - ceph_assert(seastar::smp::count >= 1+server_conf.core); - ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs); - return seastar::when_all_succeed( - server->init(server_conf.v1_crc_enabled, server_conf.addr), - client->init(client_conf.v1_crc_enabled)) - .then([client, addr = client_conf.server_addr] { - return client->connect_wait_verify(addr); - }).then([client, ramptime = client_conf.ramptime, - msgtime = client_conf.msgtime] { - return client->dispatch_with_timer(ramptime, msgtime); - }).finally([client] { - return client->shutdown(); - }).finally([server] { - return server->shutdown(); - }); - } else if (mode == perf_mode_t::client) { - logger().info("\nperf settings:\n {}\n", client_conf.str()); - ceph_assert(seastar::smp::count >= 1+client_conf.jobs); - ceph_assert(client_conf.jobs > 0); - return client->init(client_conf.v1_crc_enabled) - .then([client, addr = client_conf.server_addr] { - return client->connect_wait_verify(addr); - }).then([client, ramptime = client_conf.ramptime, - msgtime = client_conf.msgtime] { - return client->dispatch_with_timer(ramptime, msgtime); - }).finally([client] { - return client->shutdown(); - }); - } else { // mode == perf_mode_t::server - ceph_assert(seastar::smp::count >= 1+server_conf.core); - logger().info("\nperf settings:\n {}\n", server_conf.str()); - return server->init(server_conf.v1_crc_enabled, server_conf.addr) - // dispatch ops - .then([server] { - return server->msgr->wait(); - // shutdown - }).finally([server] { - return server->shutdown(); - }); - } - }); + test_state::Server::create(server_conf.core, server_conf.block_size), + create_sharded(client_conf.jobs, client_conf.block_size, client_conf.depth) + ).then([=](test_state::ServerFRef fp_server, + test_state::Client *client) { + test_state::Server* server = fp_server.get(); + if (mode == perf_mode_t::both) { + logger().info("\nperf settings:\n {}\n {}\n", + client_conf.str(), server_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); + ceph_assert(seastar::smp::count >= 1+server_conf.core); + ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs); + return seastar::when_all_succeed( + server->init(server_conf.v1_crc_enabled, server_conf.addr), + client->init(client_conf.v1_crc_enabled) + ).then([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).finally([client] { + return client->shutdown(); + }).finally([server, fp_server = std::move(fp_server)] () mutable { + return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }); + } else if (mode == perf_mode_t::client) { + logger().info("\nperf settings:\n {}\n", client_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); + return client->init(client_conf.v1_crc_enabled + ).then([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).finally([client] { + return client->shutdown(); + }); + } else { // mode == perf_mode_t::server + ceph_assert(seastar::smp::count >= 1+server_conf.core); + logger().info("\nperf settings:\n {}\n", server_conf.str()); + return server->init(server_conf.v1_crc_enabled, server_conf.addr + // dispatch ops + ).then([server] { + return server->wait(); + // shutdown + }).finally([server, fp_server = std::move(fp_server)] () mutable { + return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }); + } + }); } }