]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: implement and adopt shard-local messenger
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 22 Jan 2020 04:31:26 +0000 (12:31 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 12 Feb 2020 02:47:47 +0000 (10:47 +0800)
Remove the constraints to start messenger as a sharded service, and
remove foreign pointers from messenger interfaces. This simplifies users
to manage shard-local messenger as a normal object.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
21 files changed:
src/crimson/mgr/client.cc
src/crimson/mon/MonClient.cc
src/crimson/net/Connection.h
src/crimson/net/Dispatcher.h
src/crimson/net/Fwd.h
src/crimson/net/Messenger.cc
src/crimson/net/Messenger.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/main.cc
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/shard_services.cc
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_messenger.cc
src/test/crimson/test_monc.cc
src/tools/crimson/perf_crimson_msgr.cc

index 845c02f256ed6971193e312f541f457c2731ae1b..5243338c27790a8865274d0436129c8222577923 100644 (file)
@@ -73,8 +73,8 @@ seastar::future<> Client::reconnect()
     }
     auto peer = mgrmap.get_active_addrs().front();
     return msgr.connect(peer, CEPH_ENTITY_TYPE_MGR).then(
-      [this](auto xconn) {
-        conn = xconn->release();
+      [this](auto _conn) {
+        conn = _conn;
         // ask for the mgrconfigure message
         auto m = ceph::make_message<MMgrOpen>();
         m->daemon_name = local_conf()->name.get_id();
index 117b9b3ebd8eb4a387273d01c2030023b87d1243..4c1790ce0c97264c1821b6851e70d197d1be65d2 100644 (file)
@@ -943,11 +943,7 @@ seastar::future<> Client::reopen_session(int rank)
     auto peer = monmap.get_addrs(rank).front();
     logger().info("connecting to mon.{}", rank);
     return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then(
-      [this] (auto xconn) -> seastar::future<Connection::AuthResult> {
-      // sharded-messenger compatible mode assumes all connections running
-      // in one shard.
-      ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
-      crimson::net::ConnectionRef conn = xconn->release();
+      [this] (auto conn) -> seastar::future<Connection::AuthResult> {
       auto& mc = pending_conns.emplace_back(
        std::make_unique<Connection>(auth_registry, conn, &keyring));
       if (conn->get_peer_addr().is_msgr2()) {
index f2d8ad74161c68ddb0413dc6f6b7ebb864e75636..d43d61b699e6488b2999e2415e2d59626d80e8d8 100644 (file)
@@ -110,9 +110,6 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
   // will wait for all connections closed
   virtual seastar::future<> close() = 0;
 
-  /// which shard id the connection lives
-  virtual seastar::shard_id shard_id() const = 0;
-
   virtual void print(ostream& out) const = 0;
 
   void set_last_keepalive(clock_t::time_point when) {
index 304719b811d02f9f25cad802158224995da585ce..ac608fc431536bb6338db33612806baa4252ffb1 100644 (file)
@@ -53,11 +53,6 @@ class Dispatcher {
                       bufferlist&) {
     return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{});
   }
-
-  // get the local dispatcher shard if it is accessed by another core
-  virtual Dispatcher* get_local_shard() {
-    return this;
-  }
 };
 
 } // namespace crimson::net
index 078b61e8b4400349a2e72394e1cdae1aa9de719c..2221533967243c4932f855523109b0cfceb8629d 100644 (file)
@@ -33,29 +33,10 @@ using stop_t = seastar::stop_iteration;
 
 class Connection;
 using ConnectionRef = seastar::shared_ptr<Connection>;
-// NOTE: ConnectionXRef should only be used in seastar world, because
-// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads.
-using ConnectionXRef = seastar::lw_shared_ptr<seastar::foreign_ptr<ConnectionRef>>;
 
 class Dispatcher;
 
 class Messenger;
-
-template <typename T, typename... Args>
-seastar::future<T*> create_sharded(Args... args) {
-  // seems we should only construct/stop shards on #0
-  return seastar::smp::submit_to(0, [=] {
-    auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
-    return sharded_obj->start(args...).then([sharded_obj]() {
-      seastar::engine().at_exit([sharded_obj]() {
-          return sharded_obj->stop().finally([sharded_obj] {});
-        });
-      return sharded_obj.get();
-    });
-  }).then([] (seastar::sharded<T> *ptr_shard) {
-    // return the pointer valid for the caller CPU
-    return &ptr_shard->local();
-  });
-}
+using MessengerRef = seastar::shared_ptr<Messenger>;
 
 } // namespace crimson::net
index 2b2a6ff2a14519087f20b2020c2b3d429684b7bc..aab476f7a4b020ae8de86c5199cfce21118b89c7 100644 (file)
@@ -6,21 +6,12 @@
 
 namespace crimson::net {
 
-seastar::future<Messenger*>
+MessengerRef
 Messenger::create(const entity_name_t& name,
                   const std::string& lname,
-                  const uint64_t nonce,
-                  const int master_sid)
+                  const uint64_t nonce)
 {
-  // enforce the messenger to a specific core (master_sid)
-  // TODO: drop the cross-core feature and cleanup the related interfaces in
-  // the future.
-  ceph_assert(master_sid >= 0);
-  return create_sharded<SocketMessenger>(
-      name, lname, nonce, static_cast<seastar::shard_id>(master_sid)
-  ).then([](Messenger *msgr) {
-    return msgr;
-  });
+  return seastar::make_shared<SocketMessenger>(name, lname, nonce);
 }
 
 } // namespace crimson::net
index f0004365fd76a2bd442810c5112a2b8eefb54c91..9f6fcf3658b3b5a1e60bd351c892dcdc46a14e64 100644 (file)
@@ -76,7 +76,7 @@ public:
 
   /// either return an existing connection to the peer,
   /// or a new pending connection
-  virtual seastar::future<ConnectionXRef>
+  virtual seastar::future<ConnectionRef>
   connect(const entity_addr_t& peer_addr,
           const entity_type_t& peer_type) = 0;
 
@@ -106,11 +106,6 @@ public:
     auth_server = as;
   }
 
-  // get the local messenger shard if it is accessed by another core
-  virtual Messenger* get_local_shard() {
-    return this;
-  }
-
   virtual void print(ostream& out) const = 0;
 
   virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0;
@@ -131,11 +126,10 @@ public:
   void set_require_authorizer(bool r) {
     require_authorizer = r;
   }
-  static seastar::future<Messenger*>
+  static MessengerRef
   create(const entity_name_t& name,
          const std::string& lname,
-         const uint64_t nonce,
-         const int master_sid=-1);
+         const uint64_t nonce);
 };
 
 inline ostream& operator<<(ostream& out, const Messenger& msgr) {
index 89d89557ab0b472e3c6c254854e6ca2c2a57c046..4a73034e922628f29c92423be9e7a312cf49cd9b 100644 (file)
@@ -30,7 +30,6 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    bool is_msgr2)
   : messenger(messenger)
 {
-  ceph_assert(&messenger.container().local() == &messenger);
   if (is_msgr2) {
     protocol = std::make_unique<ProtocolV2>(dispatcher, *this, messenger);
   } else {
@@ -53,14 +52,14 @@ SocketConnection::get_messenger() const {
 
 bool SocketConnection::is_connected() const
 {
-  ceph_assert(seastar::engine().cpu_id() == shard_id());
+  assert(seastar::engine().cpu_id() == shard_id());
   return protocol->is_connected();
 }
 
 #ifdef UNIT_TESTS_BUILT
 bool SocketConnection::is_closed() const
 {
-  ceph_assert(seastar::engine().cpu_id() == shard_id());
+  assert(seastar::engine().cpu_id() == shard_id());
   return protocol->is_closed();
 }
 
@@ -72,23 +71,19 @@ bool SocketConnection::peer_wins() const
 
 seastar::future<> SocketConnection::send(MessageRef msg)
 {
-  // Cannot send msg from another core now, its ref counter can be contaminated!
-  ceph_assert(seastar::engine().cpu_id() == shard_id());
-  return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
-    return protocol->send(std::move(msg));
-  });
+  assert(seastar::engine().cpu_id() == shard_id());
+  return protocol->send(std::move(msg));
 }
 
 seastar::future<> SocketConnection::keepalive()
 {
-  return seastar::smp::submit_to(shard_id(), [this] {
-    return protocol->keepalive();
-  });
+  assert(seastar::engine().cpu_id() == shard_id());
+  return protocol->keepalive();
 }
 
 seastar::future<> SocketConnection::close()
 {
-  ceph_assert(seastar::engine().cpu_id() == shard_id());
+  assert(seastar::engine().cpu_id() == shard_id());
   return protocol->close();
 }
 
index c6f58c7630e05ef9c32458c332f1104d2b1c01cc..503d4e55fb049819b0902715ba6d50b348f2d930 100644 (file)
@@ -64,6 +64,8 @@ class SocketConnection : public Connection {
   // messages sent, but not yet acked by peer
   std::deque<MessageRef> sent;
 
+  seastar::shard_id shard_id() const;
+
  public:
   SocketConnection(SocketMessenger& messenger,
                    Dispatcher& dispatcher,
@@ -88,8 +90,6 @@ class SocketConnection : public Connection {
 
   seastar::future<> close() override;
 
-  seastar::shard_id shard_id() const override;
-
   void print(ostream& out) const override;
 
   /// start a handshake from the client's perspective,
index d5c0f5a38efacc77ffab22be06ce37b2d0508ac2..75ba6bc70673996037c5f592d4abed4300654799 100644 (file)
@@ -32,10 +32,9 @@ namespace crimson::net {
 
 SocketMessenger::SocketMessenger(const entity_name_t& myname,
                                  const std::string& logic_name,
-                                 uint32_t nonce,
-                                 seastar::shard_id master_sid)
+                                 uint32_t nonce)
   : Messenger{myname},
-    master_sid{master_sid},
+    master_sid{seastar::engine().cpu_id()},
     logic_name{logic_name},
     nonce{nonce}
 {}
@@ -115,7 +114,7 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs,
 seastar::future<> SocketMessenger::start(Dispatcher *disp) {
   assert(seastar::engine().cpu_id() == master_sid);
 
-  dispatcher = disp->get_local_shard();
+  dispatcher = disp;
   if (listener) {
     // make sure we have already bound to a valid address
     ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
@@ -132,7 +131,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) {
   return seastar::now();
 }
 
-seastar::future<crimson::net::ConnectionXRef>
+seastar::future<crimson::net::ConnectionRef>
 SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
 {
   assert(seastar::engine().cpu_id() == master_sid);
@@ -141,18 +140,13 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe
   ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
   ceph_assert(peer_addr.get_port() > 0);
 
-  // TODO: use ConnectionRef
   if (auto found = lookup_conn(peer_addr); found) {
-    return seastar::make_ready_future<ConnectionXRef>(
-      seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(
-        seastar::make_foreign(found->shared_from_this())));
+    return seastar::make_ready_future<ConnectionRef>(found->shared_from_this());
   }
   SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
       *this, *dispatcher, peer_addr.is_msgr2());
   conn->start_connect(peer_addr, peer_type);
-  return seastar::make_ready_future<ConnectionXRef>(
-    seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(
-      seastar::make_foreign(conn->shared_from_this())));
+  return seastar::make_ready_future<ConnectionRef>(conn->shared_from_this());
 }
 
 seastar::future<> SocketMessenger::shutdown()
index 71ef441e9e9a8e9e17f4d2bd7ad0bca85f83d590..cf5fa7d1de20ea86f46261d537ae5dd53d6ee82e 100644 (file)
@@ -29,7 +29,7 @@ namespace crimson::net {
 
 class FixedCPUServerSocket;
 
-class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
+class SocketMessenger final : public Messenger {
   const seastar::shard_id master_sid;
   seastar::promise<> shutdown_promise;
 
@@ -51,8 +51,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
  public:
   SocketMessenger(const entity_name_t& myname,
                   const std::string& logic_name,
-                  uint32_t nonce,
-                  seastar::shard_id master_sid);
+                  uint32_t nonce);
   ~SocketMessenger() override { ceph_assert(!listener); }
 
   seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
@@ -66,8 +65,8 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
 
   seastar::future<> start(Dispatcher *dispatcher) override;
 
-  seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
-                                          const entity_type_t& peer_type) override;
+  seastar::future<ConnectionRef> connect(const entity_addr_t& peer_addr,
+                                         const entity_type_t& peer_type) override;
   // can only wait once
   seastar::future<> wait() override {
     assert(seastar::engine().cpu_id() == master_sid);
@@ -76,10 +75,6 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
 
   seastar::future<> shutdown() override;
 
-  Messenger* get_local_shard() override {
-    return &container().local();
-  }
-
   void print(ostream& out) const override {
     out << get_myname()
         << "(" << logic_name
index 12f6be2769b88e5eacdb83150431a2a95cab5395..92079f3c8a565aff22fb6b19edef8a2c5f4556ed 100644 (file)
@@ -26,8 +26,8 @@ namespace {
 
 Heartbeat::Heartbeat(const crimson::osd::ShardServices& service,
                      crimson::mon::Client& monc,
-                     crimson::net::Messenger& front_msgr,
-                     crimson::net::Messenger& back_msgr)
+                     crimson::net::MessengerRef front_msgr,
+                     crimson::net::MessengerRef back_msgr)
   : service{service},
     monc{monc},
     front_msgr{front_msgr},
@@ -46,12 +46,12 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
   }
 
   using crimson::net::SocketPolicy;
-  front_msgr.set_policy(entity_name_t::TYPE_OSD,
+  front_msgr->set_policy(entity_name_t::TYPE_OSD,
+                         SocketPolicy::stateless_server(0));
+  back_msgr->set_policy(entity_name_t::TYPE_OSD,
                         SocketPolicy::stateless_server(0));
-  back_msgr.set_policy(entity_name_t::TYPE_OSD,
-                       SocketPolicy::stateless_server(0));
-  return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs),
-                                   start_messenger(back_msgr, back_addrs))
+  return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs),
+                                   start_messenger(*back_msgr, back_addrs))
     .then([this] {
       timer.arm_periodic(
         std::chrono::seconds(local_conf()->osd_heartbeat_interval));
@@ -71,25 +71,25 @@ Heartbeat::start_messenger(crimson::net::Messenger& msgr,
 
 seastar::future<> Heartbeat::stop()
 {
-  return seastar::when_all_succeed(front_msgr.shutdown(),
-                                   back_msgr.shutdown());
+  return seastar::when_all_succeed(front_msgr->shutdown(),
+                                   back_msgr->shutdown());
 }
 
 const entity_addrvec_t& Heartbeat::get_front_addrs() const
 {
-  return front_msgr.get_myaddrs();
+  return front_msgr->get_myaddrs();
 }
 
 const entity_addrvec_t& Heartbeat::get_back_addrs() const
 {
-  return back_msgr.get_myaddrs();
+  return back_msgr->get_myaddrs();
 }
 
 void Heartbeat::set_require_authorizer(bool require_authorizer)
 {
-  if (front_msgr.get_require_authorizer() != require_authorizer) {
-    front_msgr.set_require_authorizer(require_authorizer);
-    back_msgr.set_require_authorizer(require_authorizer);
+  if (front_msgr->get_require_authorizer() != require_authorizer) {
+    front_msgr->set_require_authorizer(require_authorizer);
+    back_msgr->set_require_authorizer(require_authorizer);
   }
 }
 
@@ -103,14 +103,13 @@ seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
     auto osdmap = service.get_osdmap_service().get_map();
     // TODO: use addrs
     return seastar::when_all_succeed(
-        front_msgr.connect(osdmap->get_hb_front_addrs(peer).front(),
-                           CEPH_ENTITY_TYPE_OSD),
-        back_msgr.connect(osdmap->get_hb_back_addrs(peer).front(),
-                          CEPH_ENTITY_TYPE_OSD))
-      .then([&info=peer_info->second] (auto xcon_front, auto xcon_back) {
-        // sharded-messenger compatible mode
-        info.con_front = xcon_front->release();
-        info.con_back = xcon_back->release();
+        front_msgr->connect(osdmap->get_hb_front_addrs(peer).front(),
+                            CEPH_ENTITY_TYPE_OSD),
+        back_msgr->connect(osdmap->get_hb_back_addrs(peer).front(),
+                           CEPH_ENTITY_TYPE_OSD))
+      .then([&info=peer_info->second] (auto con_front, auto con_back) {
+        info.con_front = con_front;
+        info.con_back = con_back;
       });
   } else {
     return seastar::now();
index d3978da89f2d9c70cf32480a99cbfef1c36dbb42..b8ff3bcccc681a794590cb0ba4c4a26f487a7b39 100644 (file)
@@ -27,8 +27,8 @@ public:
 
   Heartbeat(const crimson::osd::ShardServices& service,
            crimson::mon::Client& monc,
-           crimson::net::Messenger& front_msgr,
-           crimson::net::Messenger& back_msgr);
+           crimson::net::MessengerRef front_msgr,
+           crimson::net::MessengerRef back_msgr);
 
   seastar::future<> start(entity_addrvec_t front,
                          entity_addrvec_t back);
@@ -74,8 +74,8 @@ private:
 private:
   const crimson::osd::ShardServices& service;
   crimson::mon::Client& monc;
-  crimson::net::Messenger& front_msgr;
-  crimson::net::Messenger& back_msgr;
+  crimson::net::MessengerRef front_msgr;
+  crimson::net::MessengerRef back_msgr;
 
   seastar::timer<seastar::lowres_clock> timer;
   // use real_clock so it can be converted to utime_t
index 4ffe64546824c3b66ad003460e3452ee11d7d2fc..370d1e8dfbd60d89dac35fbc268a425ae3ea1c9d 100644 (file)
@@ -15,7 +15,7 @@
 #include "common/ceph_argparse.h"
 #include "crimson/common/buffer_io.h"
 #include "crimson/common/config_proxy.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
 #include "global/pidfile.h"
 
 #include "osd.h"
@@ -129,8 +129,6 @@ int main(int argc, char* argv[])
                                               &cluster_name,
                                               &conf_file_list);
   seastar::sharded<crimson::osd::OSD> osd;
-  seastar::sharded<crimson::net::SocketMessenger> cluster_msgr, client_msgr;
-  seastar::sharded<crimson::net::SocketMessenger> hb_front_msgr, hb_back_msgr;
   using crimson::common::sharded_conf;
   using crimson::common::sharded_perf_coll;
   try {
@@ -156,30 +154,23 @@ int main(int argc, char* argv[])
         pidfile_write(local_conf()->pid_file);
         const int whoami = std::stoi(local_conf()->name.get_id());
         const auto nonce = static_cast<uint32_t>(getpid());
+        crimson::net::MessengerRef cluster_msgr, client_msgr;
+        crimson::net::MessengerRef hb_front_msgr, hb_back_msgr;
         for (auto [msgr, name] : {make_pair(std::ref(cluster_msgr), "cluster"s),
                                   make_pair(std::ref(client_msgr), "client"s),
                                   make_pair(std::ref(hb_front_msgr), "hb_front"s),
                                   make_pair(std::ref(hb_back_msgr), "hb_back"s)}) {
-          const auto shard = seastar::engine().cpu_id();
-          msgr.start(entity_name_t::OSD(whoami), name, nonce, shard).get();
+          msgr = crimson::net::Messenger::create(entity_name_t::OSD(whoami), name, nonce);
           if (local_conf()->ms_crc_data) {
-            msgr.local().set_crc_data();
+            msgr->set_crc_data();
           }
           if (local_conf()->ms_crc_header) {
-            msgr.local().set_crc_header();
+            msgr->set_crc_header();
           }
         }
         osd.start_single(whoami, nonce,
-          reference_wrapper<crimson::net::Messenger>(cluster_msgr.local()),
-          reference_wrapper<crimson::net::Messenger>(client_msgr.local()),
-          reference_wrapper<crimson::net::Messenger>(hb_front_msgr.local()),
-          reference_wrapper<crimson::net::Messenger>(hb_back_msgr.local())).get();
-        seastar::engine().at_exit([&] {
-          return seastar::when_all_succeed(cluster_msgr.stop(),
-                                           client_msgr.stop(),
-                                           hb_front_msgr.stop(),
-                                           hb_back_msgr.stop());
-        });
+                         cluster_msgr, client_msgr,
+                         hb_front_msgr, hb_back_msgr).get();
         if (config.count("mkkey")) {
           make_keyring().handle_exception([](std::exception_ptr) {
             seastar::engine().exit(1);
index 8e49d122203766c81117cd40610b74a9af12d16c..59fa5fbcaf4baf3f2709a92d3e1cb44f53bc3fd6 100644 (file)
@@ -54,22 +54,22 @@ using crimson::os::FuturizedStore;
 namespace crimson::osd {
 
 OSD::OSD(int id, uint32_t nonce,
-         crimson::net::Messenger& cluster_msgr,
-         crimson::net::Messenger& public_msgr,
-         crimson::net::Messenger& hb_front_msgr,
-         crimson::net::Messenger& hb_back_msgr)
+         crimson::net::MessengerRef cluster_msgr,
+         crimson::net::MessengerRef public_msgr,
+         crimson::net::MessengerRef hb_front_msgr,
+         crimson::net::MessengerRef hb_back_msgr)
   : whoami{id},
     nonce{nonce},
     // do this in background
     beacon_timer{[this] { (void)send_beacon(); }},
     cluster_msgr{cluster_msgr},
     public_msgr{public_msgr},
-    monc{new crimson::mon::Client{public_msgr, *this}},
-    mgrc{new crimson::mgr::Client{public_msgr, *this}},
+    monc{new crimson::mon::Client{*public_msgr, *this}},
+    mgrc{new crimson::mgr::Client{*public_msgr, *this}},
     store{crimson::os::FuturizedStore::create(
       local_conf().get_val<std::string>("osd_objectstore"),
       local_conf().get_val<std::string>("osd_data"))},
-    shard_services{*this, cluster_msgr, public_msgr, *monc, *mgrc, *store},
+    shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
     heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
     // do this in background
     heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
@@ -78,8 +78,8 @@ OSD::OSD(int id, uint32_t nonce,
   osdmaps[0] = boost::make_local_shared<OSDMap>();
   for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
                     std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
-    msgr.get().set_auth_server(monc.get());
-    msgr.get().set_auth_client(monc.get());
+    msgr.get()->set_auth_server(monc.get());
+    msgr.get()->set_auth_client(monc.get());
   }
 
   if (local_conf()->osd_open_classes_on_start) {
@@ -224,34 +224,34 @@ seastar::future<> OSD::start()
       CEPH_FEATURE_OSDENC;
     using crimson::net::SocketPolicy;
 
-    public_msgr.set_default_policy(SocketPolicy::stateless_server(0));
-    public_msgr.set_policy(entity_name_t::TYPE_MON,
-                           SocketPolicy::lossy_client(osd_required));
-    public_msgr.set_policy(entity_name_t::TYPE_MGR,
-                           SocketPolicy::lossy_client(osd_required));
-    public_msgr.set_policy(entity_name_t::TYPE_OSD,
-                           SocketPolicy::stateless_server(0));
-
-    cluster_msgr.set_default_policy(SocketPolicy::stateless_server(0));
-    cluster_msgr.set_policy(entity_name_t::TYPE_MON,
-                            SocketPolicy::lossy_client(0));
-    cluster_msgr.set_policy(entity_name_t::TYPE_OSD,
-                            SocketPolicy::lossless_peer(osd_required));
-    cluster_msgr.set_policy(entity_name_t::TYPE_CLIENT,
+    public_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+    public_msgr->set_policy(entity_name_t::TYPE_MON,
+                            SocketPolicy::lossy_client(osd_required));
+    public_msgr->set_policy(entity_name_t::TYPE_MGR,
+                            SocketPolicy::lossy_client(osd_required));
+    public_msgr->set_policy(entity_name_t::TYPE_OSD,
                             SocketPolicy::stateless_server(0));
 
+    cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+    cluster_msgr->set_policy(entity_name_t::TYPE_MON,
+                             SocketPolicy::lossy_client(0));
+    cluster_msgr->set_policy(entity_name_t::TYPE_OSD,
+                             SocketPolicy::lossless_peer(osd_required));
+    cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
+                             SocketPolicy::stateless_server(0));
+
     dispatchers.push_front(this);
     dispatchers.push_front(monc.get());
     dispatchers.push_front(mgrc.get());
     return seastar::when_all_succeed(
-      cluster_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+      cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+                             local_conf()->ms_bind_port_min,
+                             local_conf()->ms_bind_port_max)
+        .then([this] { return cluster_msgr->start(&dispatchers); }),
+      public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
                             local_conf()->ms_bind_port_min,
                             local_conf()->ms_bind_port_max)
-        .then([this] { return cluster_msgr.start(&dispatchers); }),
-      public_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
-                           local_conf()->ms_bind_port_min,
-                           local_conf()->ms_bind_port_max)
-        .then([this] { return public_msgr.start(&dispatchers); }));
+        .then([this] { return public_msgr->start(&dispatchers); }));
   }).then([this] {
     return seastar::when_all_succeed(monc->start(),
                                      mgrc->start());
@@ -264,15 +264,15 @@ seastar::future<> OSD::start()
     return monc->renew_subs();
   }).then([this] {
     if (auto [addrs, changed] =
-        replace_unknown_addrs(cluster_msgr.get_myaddrs(),
-                              public_msgr.get_myaddrs()); changed) {
-      return cluster_msgr.set_myaddrs(addrs);
+        replace_unknown_addrs(cluster_msgr->get_myaddrs(),
+                              public_msgr->get_myaddrs()); changed) {
+      return cluster_msgr->set_myaddrs(addrs);
     } else {
       return seastar::now();
     }
   }).then([this] {
-    return heartbeat->start(public_msgr.get_myaddrs(),
-                            cluster_msgr.get_myaddrs());
+    return heartbeat->start(public_msgr->get_myaddrs(),
+                            cluster_msgr->get_myaddrs());
   }).then([this] {
     return start_boot();
   });
@@ -325,13 +325,13 @@ seastar::future<> OSD::_send_boot()
 
   logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
   logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
-  logger().info("cluster_msgr: {}", cluster_msgr.get_myaddr());
+  logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
   auto m = make_message<MOSDBoot>(superblock,
                                   osdmap->get_epoch(),
                                   osdmap->get_epoch(),
                                   heartbeat->get_back_addrs(),
                                   heartbeat->get_front_addrs(),
-                                  cluster_msgr.get_myaddrs(),
+                                  cluster_msgr->get_myaddrs(),
                                   CEPH_FEATURES_ALL);
   return monc->send_message(m);
 }
@@ -402,8 +402,8 @@ seastar::future<> OSD::stop()
     return monc->stop();
   }).then([this] {
     return when_all_succeed(
-      public_msgr.shutdown(),
-      cluster_msgr.shutdown());
+      public_msgr->shutdown(),
+      cluster_msgr->shutdown());
   }).then([this] {
     return store->umount();
   }).handle_exception([](auto ep) {
@@ -833,7 +833,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
       shard_services.update_map(osdmap);
       if (up_epoch != 0 &&
           osdmap->is_up(whoami) &&
-          osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) {
+          osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
         up_epoch = osdmap->get_epoch();
         if (!boot_epoch) {
           boot_epoch = osdmap->get_epoch();
@@ -842,7 +842,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
     });
   }).then([m, this] {
     if (osdmap->is_up(whoami) &&
-        osdmap->get_addrs(whoami) == public_msgr.get_myaddrs() &&
+        osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
         bind_epoch < osdmap->get_up_from(whoami)) {
       if (state.is_booting()) {
         logger().info("osd.{}: activating...", whoami);
@@ -924,17 +924,17 @@ bool OSD::should_restart() const
     logger().info("map e {} marked osd.{} down",
                   osdmap->get_epoch(), whoami);
     return true;
-  } else if (osdmap->get_addrs(whoami) != public_msgr.get_myaddrs()) {
+  } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
     logger().error("map e {} had wrong client addr ({} != my {})",
                    osdmap->get_epoch(),
                    osdmap->get_addrs(whoami),
-                   public_msgr.get_myaddrs());
+                   public_msgr->get_myaddrs());
     return true;
-  } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr.get_myaddrs()) {
+  } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
     logger().error("map e {} had wrong cluster addr ({} != my {})",
                    osdmap->get_epoch(),
                    osdmap->get_cluster_addrs(whoami),
-                   cluster_msgr.get_myaddrs());
+                   cluster_msgr->get_myaddrs());
     return true;
   } else {
     return false;
index 570017a8a9bb1c190e128ebea397e57792b0bba0..95c502636b25cad83189ee5bc71e2757d4189358 100644 (file)
@@ -68,9 +68,9 @@ class OSD final : public crimson::net::Dispatcher,
   const uint32_t nonce;
   seastar::timer<seastar::lowres_clock> beacon_timer;
   // talk with osd
-  crimson::net::Messenger& cluster_msgr;
+  crimson::net::MessengerRef cluster_msgr;
   // talk with client/mon/mgr
-  crimson::net::Messenger& public_msgr;
+  crimson::net::MessengerRef public_msgr;
   ChainedDispatchers dispatchers;
   std::unique_ptr<crimson::mon::Client> monc;
   std::unique_ptr<crimson::mgr::Client> mgrc;
@@ -117,10 +117,10 @@ class OSD final : public crimson::net::Dispatcher,
 
 public:
   OSD(int id, uint32_t nonce,
-      crimson::net::Messenger& cluster_msgr,
-      crimson::net::Messenger& client_msgr,
-      crimson::net::Messenger& hb_front_msgr,
-      crimson::net::Messenger& hb_back_msgr);
+      crimson::net::MessengerRef cluster_msgr,
+      crimson::net::MessengerRef client_msgr,
+      crimson::net::MessengerRef hb_front_msgr,
+      crimson::net::MessengerRef hb_back_msgr);
   ~OSD() final;
 
   seastar::future<> mkfs(uuid_d osd_uuid, uuid_d cluster_fsid);
index 35d4f6d614a88114e10e4f5987da2bf7959d2c6e..784fca6fa5dc6e0393a0e1531eb0249f5dce93ac 100644 (file)
@@ -59,8 +59,8 @@ seastar::future<> ShardServices::send_to_osd(
   } else {
     return cluster_msgr.connect(osdmap->get_cluster_addrs(peer).front(),
       CEPH_ENTITY_TYPE_OSD)
-      .then([m, this] (auto xconn) {
-             return (*xconn)->send(m);
+      .then([m, this] (auto conn) {
+             return conn->send(m);
            });
   }
 }
index abccc5034ecc9629f5f9d3c94a15867998d2a35e..1a8b2039c56ab806b5d67006f8ea8d580b8e4f28 100644 (file)
@@ -37,7 +37,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer {
 
 struct Server {
   crimson::thread::Throttle byte_throttler;
-  crimson::net::Messenger& msgr;
+  crimson::net::MessengerRef msgr;
   crimson::auth::DummyAuthClientServer dummy_auth;
   struct ServerDispatcher : crimson::net::Dispatcher {
     unsigned count = 0;
@@ -59,18 +59,18 @@ struct Server {
           0, bufferlist{});
     }
   } dispatcher;
-  Server(crimson::net::Messenger& msgr)
+  Server(crimson::net::MessengerRef msgr)
     : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
       msgr{msgr}
   {
-    msgr.set_crc_header();
-    msgr.set_crc_data();
+    msgr->set_crc_header();
+    msgr->set_crc_data();
   }
 };
 
 struct Client {
   crimson::thread::Throttle byte_throttler;
-  crimson::net::Messenger& msgr;
+  crimson::net::MessengerRef msgr;
   crimson::auth::DummyAuthClientServer dummy_auth;
   struct ClientDispatcher : crimson::net::Dispatcher {
     unsigned count = 0;
@@ -83,12 +83,12 @@ struct Client {
       return seastar::now();
     }
   } dispatcher;
-  Client(crimson::net::Messenger& msgr)
+  Client(crimson::net::MessengerRef msgr)
     : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
       msgr{msgr}
   {
-    msgr.set_crc_header();
-    msgr.set_crc_data();
+    msgr->set_crc_header();
+    msgr->set_crc_data();
   }
 };
 } // namespace seastar_pingpong
@@ -151,60 +151,58 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
 {
   std::cout << "seastar/";
   if (role == echo_role::as_server) {
-    return crimson::net::Messenger::create(entity_name_t::OSD(0), "server",
-                                        addr.get_nonce(), 0)
-      .then([addr, count] (auto msgr) {
-        return seastar::do_with(seastar_pingpong::Server{*msgr},
-          [addr, count](auto& server) mutable {
-            std::cout << "server listening at " << addr << std::endl;
-            // bind the server
-            server.msgr.set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-            server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
-                                             &server.byte_throttler);
-            server.msgr.set_require_authorizer(false);
-            server.msgr.set_auth_client(&server.dummy_auth);
-            server.msgr.set_auth_server(&server.dummy_auth);
-            return server.msgr.bind(entity_addrvec_t{addr})
-              .then([&server] {
-                return server.msgr.start(&server.dispatcher);
-              }).then([&dispatcher=server.dispatcher, count] {
-                return dispatcher.on_reply.wait([&dispatcher, count] {
-                  return dispatcher.count >= count;
-                });
-              }).finally([&server] {
-                std::cout << "server shutting down" << std::endl;
-                return server.msgr.shutdown();
-              });
-          });
+    return seastar::do_with(
+        seastar_pingpong::Server{crimson::net::Messenger::create(
+            entity_name_t::OSD(0), "server", addr.get_nonce())},
+        [addr, count](auto& server) mutable {
+      std::cout << "server listening at " << addr << std::endl;
+      // bind the server
+      server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+      server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
+                                        &server.byte_throttler);
+      server.msgr->set_require_authorizer(false);
+      server.msgr->set_auth_client(&server.dummy_auth);
+      server.msgr->set_auth_server(&server.dummy_auth);
+      return server.msgr->bind(entity_addrvec_t{addr}
+      ).then([&server] {
+        return server.msgr->start(&server.dispatcher);
+      }).then([&dispatcher=server.dispatcher, count] {
+        return dispatcher.on_reply.wait([&dispatcher, count] {
+          return dispatcher.count >= count;
+        });
+      }).finally([&server] {
+        std::cout << "server shutting down" << std::endl;
+        return server.msgr->shutdown();
       });
+    });
   } else {
-    return crimson::net::Messenger::create(entity_name_t::OSD(1), "client",
-                                        addr.get_nonce(), 0)
-      .then([addr, count] (auto msgr) {
-        return seastar::do_with(seastar_pingpong::Client{*msgr},
-          [addr, count](auto& client) {
-            std::cout << "client sending to " << addr << std::endl;
-            client.msgr.set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-            client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
-                                             &client.byte_throttler);
-            client.msgr.set_require_authorizer(false);
-            client.msgr.set_auth_client(&client.dummy_auth);
-            client.msgr.set_auth_server(&client.dummy_auth);
-            return client.msgr.start(&client.dispatcher)
-              .then([addr, &client] {
-                return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
-              }).then([&disp=client.dispatcher, count](crimson::net::ConnectionXRef conn) {
-                return seastar::do_until(
-                  [&disp,count] { return disp.count >= count; },
-                  [&disp,conn] { return (*conn)->send(make_message<MPing>())
-                                   .then([&] { return disp.on_reply.wait(); });
-                });
-              }).finally([&client] {
-                std::cout << "client shutting down" << std::endl;
-                return client.msgr.shutdown();
-              });
-          });
+    return seastar::do_with(
+        seastar_pingpong::Client{crimson::net::Messenger::create(
+            entity_name_t::OSD(1), "client", addr.get_nonce())},
+        [addr, count](auto& client) {
+      std::cout << "client sending to " << addr << std::endl;
+      client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+      client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
+                                        &client.byte_throttler);
+      client.msgr->set_require_authorizer(false);
+      client.msgr->set_auth_client(&client.dummy_auth);
+      client.msgr->set_auth_server(&client.dummy_auth);
+      return client.msgr->start(&client.dispatcher).then([addr, &client] {
+        return client.msgr->connect(addr, entity_name_t::TYPE_OSD);
+      }).then([&disp=client.dispatcher, count](crimson::net::ConnectionRef conn) {
+        return seastar::do_until(
+          [&disp,count] { return disp.count >= count; },
+          [&disp,conn] {
+            return conn->send(make_message<MPing>()).then([&] {
+              return disp.on_reply.wait();
+            });
+          }
+        );
+      }).finally([&client] {
+        std::cout << "client shutting down" << std::endl;
+        return client.msgr->shutdown();
       });
+    });
   }
 }
 
index 322c4f4b5fddb5270df88faf3e60fb03ecdece20..ebb3910783138af4df4c188590ee93012ce51f08 100644 (file)
@@ -43,17 +43,10 @@ static seastar::future<> test_echo(unsigned rounds,
 {
   struct test_state {
     struct Server final
-        : public crimson::net::Dispatcher,
-          public seastar::peering_sharded_service<Server> {
-      crimson::net::Messenger *msgr = nullptr;
+        : public crimson::net::Dispatcher {
+      crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::make_ready_future<>();
-      }
       seastar::future<> ms_dispatch(crimson::net::Connection* c,
                                     MessageRef m) override {
         if (verbose) {
@@ -67,20 +60,14 @@ static seastar::future<> test_echo(unsigned rounds,
                              const std::string& lname,
                              const uint64_t nonce,
                              const entity_addr_t& addr) {
-        auto&& fut = crimson::net::Messenger::create(name, lname, nonce, 0);
-        return fut.then([this, addr](crimson::net::Messenger *messenger) {
-            return container().invoke_on_all([messenger](auto& server) {
-                server.msgr = messenger->get_local_shard();
-                server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-                server.msgr->set_require_authorizer(false);
-                server.msgr->set_auth_client(&server.dummy_auth);
-                server.msgr->set_auth_server(&server.dummy_auth);
-              }).then([messenger, addr] {
-                return messenger->bind(entity_addrvec_t{addr});
-              }).then([this, messenger] {
-                return messenger->start(this);
-              });
-          });
+        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+        msgr->set_require_authorizer(false);
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+          return msgr->start(this);
+        });
       }
       seastar::future<> shutdown() {
         ceph_assert(msgr);
@@ -89,9 +76,7 @@ static seastar::future<> test_echo(unsigned rounds,
     };
 
     struct Client final
-        : public crimson::net::Dispatcher,
-          public seastar::peering_sharded_service<Client> {
-
+        : public crimson::net::Dispatcher {
       struct PingSession : public seastar::enable_shared_from_this<PingSession> {
         unsigned count = 0u;
         mono_time connected_time;
@@ -101,7 +86,7 @@ static seastar::future<> test_echo(unsigned rounds,
 
       unsigned rounds;
       std::bernoulli_distribution keepalive_dist;
-      crimson::net::Messenger *msgr = nullptr;
+      crimson::net::MessengerRef msgr;
       std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
       std::map<crimson::net::Connection*, PingSessionRef> sessions;
       crimson::auth::DummyAuthClientServer dummy_auth;
@@ -118,12 +103,6 @@ static seastar::future<> test_echo(unsigned rounds,
         return found->second;
       }
 
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::now();
-      }
       seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override {
         auto session = seastar::make_shared<PingSession>();
         auto [i, added] = sessions.emplace(conn.get(), session);
@@ -143,30 +122,21 @@ static seastar::future<> test_echo(unsigned rounds,
         if (session->count == rounds) {
           logger().info("{}: finished receiving {} pongs", *c, session->count);
           session->finish_time = mono_clock::now();
-          return container().invoke_on_all([c](auto &client) {
-              auto found = client.pending_conns.find(c);
-              ceph_assert(found != client.pending_conns.end());
-              found->second.set_value();
-            });
-        } else {
-          return seastar::now();
+          auto found = pending_conns.find(c);
+          ceph_assert(found != pending_conns.end());
+          found->second.set_value();
         }
+        return seastar::now();
       }
 
       seastar::future<> init(const entity_name_t& name,
                              const std::string& lname,
                              const uint64_t nonce) {
-        return crimson::net::Messenger::create(name, lname, nonce, 0)
-          .then([this](crimson::net::Messenger *messenger) {
-            return container().invoke_on_all([messenger](auto& client) {
-                client.msgr = messenger->get_local_shard();
-                client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-                client.msgr->set_auth_client(&client.dummy_auth);
-                client.msgr->set_auth_server(&client.dummy_auth);
-              }).then([this, messenger] {
-                return messenger->start(this);
-              });
-          });
+        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->start(this);
       }
 
       seastar::future<> shutdown() {
@@ -174,153 +144,119 @@ static seastar::future<> test_echo(unsigned rounds,
         return msgr->shutdown();
       }
 
-      // Note: currently we don't support foreign dispatch a message because:
-      // 1. it is not effecient because each ref-count modification needs
-      //    a cross-core jump, so it should be discouraged.
-      // 2. messenger needs to be modified to hold a wrapper for the sending
-      //    message because it can be a nested seastar smart ptr or not.
-      // 3. in 1:1 mapping OSD, there is no need to do foreign dispatch.
-      seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr,
-                                         bool foreign_dispatch) {
-#ifndef CRIMSON_MSGR_SEND_FOREIGN
-       ceph_assert(!foreign_dispatch);
-#endif
+      seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
         mono_time start_time = mono_clock::now();
-        return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
-          .then([this, foreign_dispatch, start_time](auto conn) {
-            return seastar::futurize_apply([this, conn, foreign_dispatch] {
-                if (foreign_dispatch) {
-                  return do_dispatch_pingpong(&**conn);
-                } else {
-                  // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
-                  return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
-                      return client.do_dispatch_pingpong(conn);
-                    });
-                }
-              }).finally([this, conn, start_time] {
-                return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
-                    auto session = client.find_session(&**conn);
-                    std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
-                    std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
-                    logger().info("{}: handshake {}, pingpong {}",
-                                  **conn, dur_handshake.count(), dur_pingpong.count());
-                  });
-              });
+        return msgr->connect(peer_addr, entity_name_t::TYPE_OSD
+        ).then([this, start_time](auto conn) {
+          return seastar::futurize_apply([this, conn] {
+            return do_dispatch_pingpong(conn.get());
+          }).finally([this, conn, start_time] {
+            auto session = find_session(conn.get());
+            std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+            std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+            logger().info("{}: handshake {}, pingpong {}",
+                          *conn, dur_handshake.count(), dur_pingpong.count());
           });
+        });
       }
 
      private:
       seastar::future<> do_dispatch_pingpong(crimson::net::Connection* conn) {
-        return container().invoke_on_all([conn](auto& client) {
-            auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
-            std::ignore = i;
-            ceph_assert(added);
-          }).then([this, conn] {
-            return seastar::do_with(0u, 0u,
-                                    [this, conn](auto &count_ping, auto &count_keepalive) {
-                return seastar::do_until(
-                  [this, conn, &count_ping, &count_keepalive] {
-                    bool stop = (count_ping == rounds);
-                    if (stop) {
-                      logger().info("{}: finished sending {} pings with {} keepalives",
-                                    *conn, count_ping, count_keepalive);
-                    }
-                    return stop;
-                  },
-                  [this, conn, &count_ping, &count_keepalive] {
-                    return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
-                        if (keepalive_dist(rng)) {
-                          return conn->keepalive()
-                            .then([&count_keepalive] {
-                              count_keepalive += 1;
-                              return seastar::make_ready_future<seastar::stop_iteration>(
-                                seastar::stop_iteration::no);
-                            });
-                        } else {
-                          return conn->send(make_message<MPing>())
-                            .then([&count_ping] {
-                              count_ping += 1;
-                              return seastar::make_ready_future<seastar::stop_iteration>(
-                                seastar::stop_iteration::yes);
-                            });
-                        }
+        auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
+        std::ignore = i;
+        ceph_assert(added);
+        return seastar::do_with(0u, 0u,
+            [this, conn](auto &count_ping, auto &count_keepalive) {
+          return seastar::do_until(
+            [this, conn, &count_ping, &count_keepalive] {
+              bool stop = (count_ping == rounds);
+              if (stop) {
+                logger().info("{}: finished sending {} pings with {} keepalives",
+                              *conn, count_ping, count_keepalive);
+              }
+              return stop;
+            },
+            [this, conn, &count_ping, &count_keepalive] {
+              return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+                  if (keepalive_dist(rng)) {
+                    return conn->keepalive()
+                      .then([&count_keepalive] {
+                        count_keepalive += 1;
+                        return seastar::make_ready_future<seastar::stop_iteration>(
+                          seastar::stop_iteration::no);
                       });
-                  }).then([this, conn] {
-                    auto found = pending_conns.find(conn);
-                    return found->second.get_future();
-                  });
-              });
-          });
+                  } else {
+                    return conn->send(make_message<MPing>())
+                      .then([&count_ping] {
+                        count_ping += 1;
+                        return seastar::make_ready_future<seastar::stop_iteration>(
+                          seastar::stop_iteration::yes);
+                      });
+                  }
+                });
+            }).then([this, conn] {
+              auto found = pending_conns.find(conn);
+              return found->second.get_future();
+            }
+          );
+        });
       }
     };
   };
 
   logger().info("test_echo(rounds={}, keepalive_ratio={}, v2={}):",
                 rounds, keepalive_ratio, v2);
+  auto server1 = seastar::make_shared<test_state::Server>();
+  auto server2 = seastar::make_shared<test_state::Server>();
+  auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+  auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+  // start servers and clients
+  entity_addr_t addr1;
+  addr1.parse("127.0.0.1:9010", nullptr);
+  entity_addr_t addr2;
+  addr2.parse("127.0.0.1:9011", nullptr);
+  if (v2) {
+    addr1.set_type(entity_addr_t::TYPE_MSGR2);
+    addr2.set_type(entity_addr_t::TYPE_MSGR2);
+  } else {
+    addr1.set_type(entity_addr_t::TYPE_LEGACY);
+    addr2.set_type(entity_addr_t::TYPE_LEGACY);
+  }
   return seastar::when_all_succeed(
-      crimson::net::create_sharded<test_state::Server>(),
-      crimson::net::create_sharded<test_state::Server>(),
-      crimson::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
-      crimson::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
-    .then([rounds, keepalive_ratio, v2](test_state::Server *server1,
-                                        test_state::Server *server2,
-                                        test_state::Client *client1,
-                                        test_state::Client *client2) {
-      // start servers and clients
-      entity_addr_t addr1;
-      addr1.parse("127.0.0.1:9010", nullptr);
-      entity_addr_t addr2;
-      addr2.parse("127.0.0.1:9011", nullptr);
-      if (v2) {
-        addr1.set_type(entity_addr_t::TYPE_MSGR2);
-        addr2.set_type(entity_addr_t::TYPE_MSGR2);
-      } else {
-        addr1.set_type(entity_addr_t::TYPE_LEGACY);
-        addr2.set_type(entity_addr_t::TYPE_LEGACY);
-      }
-      return seastar::when_all_succeed(
-          server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
-          server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
-          client1->init(entity_name_t::OSD(2), "client1", 3),
-          client2->init(entity_name_t::OSD(3), "client2", 4))
-      // dispatch pingpoing
-        .then([client1, client2, server1, server2] {
-          return seastar::when_all_succeed(
-              // test connecting in parallel, accepting in parallel
-#ifdef CRIMSON_MSGR_SEND_FOREIGN
-             // operate the connection reference from a foreign core
-             client1->dispatch_pingpong(server1->msgr->get_myaddr(), true),
-             client2->dispatch_pingpong(server2->msgr->get_myaddr(), true),
-#endif
-             // operate the connection reference from a local core
-              client1->dispatch_pingpong(server2->msgr->get_myaddr(), false),
-              client2->dispatch_pingpong(server1->msgr->get_myaddr(), false));
-      // shutdown
-        }).finally([client1] {
-          logger().info("client1 shutdown...");
-          return client1->shutdown();
-        }).finally([client2] {
-          logger().info("client2 shutdown...");
-          return client2->shutdown();
-        }).finally([server1] {
-          logger().info("server1 shutdown...");
-          return server1->shutdown();
-        }).finally([server2] {
-          logger().info("server2 shutdown...");
-          return server2->shutdown();
-        }).finally([] {
-          logger().info("test_echo() done!\n");
-        });
-    });
+      server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+      server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+      client1->init(entity_name_t::OSD(2), "client1", 3),
+      client2->init(entity_name_t::OSD(3), "client2", 4)
+  // dispatch pingpoing
+  ).then([client1, client2, server1, server2] {
+    return seastar::when_all_succeed(
+        // test connecting in parallel, accepting in parallel
+        client1->dispatch_pingpong(server2->msgr->get_myaddr()),
+        client2->dispatch_pingpong(server1->msgr->get_myaddr()));
+  // shutdown
+  }).finally([client1] {
+    logger().info("client1 shutdown...");
+    return client1->shutdown();
+  }).finally([client2] {
+    logger().info("client2 shutdown...");
+    return client2->shutdown();
+  }).finally([server1] {
+    logger().info("server1 shutdown...");
+    return server1->shutdown();
+  }).finally([server2] {
+    logger().info("server2 shutdown...");
+    return server2->shutdown();
+  }).finally([server1, server2, client1, client2] {
+    logger().info("test_echo() done!\n");
+  });
 }
 
 static seastar::future<> test_concurrent_dispatch(bool v2)
 {
   struct test_state {
     struct Server final
-      : public crimson::net::Dispatcher,
-        public seastar::peering_sharded_service<Server> {
-      crimson::net::Messenger *msgr = nullptr;
+      : public crimson::net::Dispatcher {
+      crimson::net::MessengerRef msgr;
       int count = 0;
       seastar::promise<> on_second; // satisfied on second dispatch
       seastar::promise<> on_done; // satisfied when first dispatch unblocks
@@ -331,12 +267,9 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
         switch (++count) {
         case 1:
           // block on the first request until we reenter with the second
-          return on_second.get_future()
-            .then([this] {
-              return container().invoke_on_all([](Server& server) {
-                  server.on_done.set_value();
-                });
-            });
+          return on_second.get_future().then([this] {
+            on_done.set_value();
+          });
         case 2:
           on_second.set_value();
           return seastar::now();
@@ -351,105 +284,73 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
                              const std::string& lname,
                              const uint64_t nonce,
                              const entity_addr_t& addr) {
-        return crimson::net::Messenger::create(name, lname, nonce, 0)
-          .then([this, addr](crimson::net::Messenger *messenger) {
-            return container().invoke_on_all([messenger](auto& server) {
-                server.msgr = messenger->get_local_shard();
-                server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-                server.msgr->set_auth_client(&server.dummy_auth);
-                server.msgr->set_auth_server(&server.dummy_auth);
-              }).then([messenger, addr] {
-                return messenger->bind(entity_addrvec_t{addr});
-              }).then([this, messenger] {
-                return messenger->start(this);
-              });
-          });
-      }
-
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::make_ready_future<>();
+        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+          return msgr->start(this);
+        });
       }
     };
 
     struct Client final
-      : public crimson::net::Dispatcher,
-        public seastar::peering_sharded_service<Client> {
-      crimson::net::Messenger *msgr = nullptr;
+      : public crimson::net::Dispatcher {
+      crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       seastar::future<> init(const entity_name_t& name,
                              const std::string& lname,
                              const uint64_t nonce) {
-        return crimson::net::Messenger::create(name, lname, nonce, 0)
-          .then([this](crimson::net::Messenger *messenger) {
-            return container().invoke_on_all([messenger](auto& client) {
-                client.msgr = messenger->get_local_shard();
-                client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-                client.msgr->set_auth_client(&client.dummy_auth);
-                client.msgr->set_auth_server(&client.dummy_auth);
-              }).then([this, messenger] {
-                return messenger->start(this);
-              });
-          });
-      }
-
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::make_ready_future<>();
+        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->start(this);
       }
     };
   };
 
   logger().info("test_concurrent_dispatch(v2={}):", v2);
+  auto server = seastar::make_shared<test_state::Server>();
+  auto client = seastar::make_shared<test_state::Client>();
+  entity_addr_t addr;
+  addr.parse("127.0.0.1:9010", nullptr);
+  if (v2) {
+    addr.set_type(entity_addr_t::TYPE_MSGR2);
+  } else {
+    addr.set_type(entity_addr_t::TYPE_LEGACY);
+  }
+  addr.set_family(AF_INET);
   return seastar::when_all_succeed(
-      crimson::net::create_sharded<test_state::Server>(),
-      crimson::net::create_sharded<test_state::Client>())
-    .then([v2](test_state::Server *server,
-             test_state::Client *client) {
-      entity_addr_t addr;
-      addr.parse("127.0.0.1:9010", nullptr);
-      if (v2) {
-        addr.set_type(entity_addr_t::TYPE_MSGR2);
-      } else {
-        addr.set_type(entity_addr_t::TYPE_LEGACY);
-      }
-      addr.set_family(AF_INET);
-      return seastar::when_all_succeed(
-          server->init(entity_name_t::OSD(4), "server3", 5, addr),
-          client->init(entity_name_t::OSD(5), "client3", 6))
-        .then([server, client] {
-          return client->msgr->connect(server->msgr->get_myaddr(),
-                                      entity_name_t::TYPE_OSD);
-        }).then([](crimson::net::ConnectionXRef conn) {
-          // send two messages
-          return (*conn)->send(make_message<MPing>()).then([conn] {
-            return (*conn)->send(make_message<MPing>());
-          });
-        }).then([server] {
-          return server->wait();
-        }).finally([client] {
-          logger().info("client shutdown...");
-          return client->msgr->shutdown();
-        }).finally([server] {
-          logger().info("server shutdown...");
-          return server->msgr->shutdown();
-        }).finally([] {
-          logger().info("test_concurrent_dispatch() done!\n");
-        });
-    });
+      server->init(entity_name_t::OSD(4), "server3", 5, addr),
+      client->init(entity_name_t::OSD(5), "client3", 6)
+  ).then([server, client] {
+    return client->msgr->connect(server->msgr->get_myaddr(),
+                                 entity_name_t::TYPE_OSD);
+  }).then([](crimson::net::ConnectionRef conn) {
+    // send two messages
+    return conn->send(make_message<MPing>()).then([conn] {
+      return conn->send(make_message<MPing>());
+    });
+  }).then([server] {
+    return server->wait();
+  }).finally([client] {
+    logger().info("client shutdown...");
+    return client->msgr->shutdown();
+  }).finally([server] {
+    logger().info("server shutdown...");
+    return server->msgr->shutdown();
+  }).finally([server, client] {
+    logger().info("test_concurrent_dispatch() done!\n");
+  });
 }
 
 seastar::future<> test_preemptive_shutdown(bool v2) {
   struct test_state {
     class Server final
-      : public crimson::net::Dispatcher,
-        public seastar::peering_sharded_service<Server> {
-      crimson::net::Messenger *msgr = nullptr;
+      : public crimson::net::Dispatcher {
+      crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       seastar::future<> ms_dispatch(crimson::net::Connection* c,
@@ -462,18 +363,12 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
                              const std::string& lname,
                              const uint64_t nonce,
                              const entity_addr_t& addr) {
-        return crimson::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
-        ).then([this, addr](crimson::net::Messenger *messenger) {
-          return container().invoke_on_all([messenger](auto& server) {
-            server.msgr = messenger->get_local_shard();
-            server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-            server.msgr->set_auth_client(&server.dummy_auth);
-            server.msgr->set_auth_server(&server.dummy_auth);
-          }).then([messenger, addr] {
-            return messenger->bind(entity_addrvec_t{addr});
-          }).then([this, messenger] {
-            return messenger->start(this);
-          });
+        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+          return msgr->start(this);
         });
       }
       entity_addr_t get_addr() const {
@@ -482,18 +377,11 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       seastar::future<> shutdown() {
         return msgr->shutdown();
       }
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::now();
-      }
     };
 
     class Client final
-      : public crimson::net::Dispatcher,
-        public seastar::peering_sharded_service<Client> {
-      crimson::net::Messenger *msgr = nullptr;
+      : public crimson::net::Dispatcher {
+      crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       bool stop_send = false;
@@ -508,25 +396,19 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       seastar::future<> init(const entity_name_t& name,
                              const std::string& lname,
                              const uint64_t nonce) {
-        return crimson::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
-        ).then([this](crimson::net::Messenger *messenger) {
-          return container().invoke_on_all([messenger](auto& client) {
-            client.msgr = messenger->get_local_shard();
-            client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-            client.msgr->set_auth_client(&client.dummy_auth);
-            client.msgr->set_auth_server(&client.dummy_auth);
-          }).then([this, messenger] {
-            return messenger->start(this);
-          });
-        });
+        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->start(this);
       }
       seastar::future<> send_pings(const entity_addr_t& addr) {
         return msgr->connect(addr, entity_name_t::TYPE_OSD
-        ).then([this](crimson::net::ConnectionXRef conn) {
+        ).then([this](crimson::net::ConnectionRef conn) {
           // forwarded to stopped_send_promise
           (void) seastar::do_until(
             [this] { return stop_send; },
-            [this, conn = &**conn] {
+            [this, conn] {
               return conn->send(make_message<MPing>()).then([] {
                 return seastar::sleep(0ms);
               });
@@ -542,45 +424,35 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
           return stopped_send_promise.get_future();
         });
       }
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::now();
-      }
     };
   };
 
   logger().info("test_preemptive_shutdown(v2={}):", v2);
+  auto server = seastar::make_shared<test_state::Server>();
+  auto client = seastar::make_shared<test_state::Client>();
+  entity_addr_t addr;
+  addr.parse("127.0.0.1:9010", nullptr);
+  if (v2) {
+    addr.set_type(entity_addr_t::TYPE_MSGR2);
+  } else {
+    addr.set_type(entity_addr_t::TYPE_LEGACY);
+  }
+  addr.set_family(AF_INET);
   return seastar::when_all_succeed(
-    crimson::net::create_sharded<test_state::Server>(),
-    crimson::net::create_sharded<test_state::Client>()
-  ).then([v2](test_state::Server *server,
-             test_state::Client *client) {
-    entity_addr_t addr;
-    addr.parse("127.0.0.1:9010", nullptr);
-    if (v2) {
-      addr.set_type(entity_addr_t::TYPE_MSGR2);
-    } else {
-      addr.set_type(entity_addr_t::TYPE_LEGACY);
-    }
-    addr.set_family(AF_INET);
-    return seastar::when_all_succeed(
-      server->init(entity_name_t::OSD(6), "server4", 7, addr),
-      client->init(entity_name_t::OSD(7), "client4", 8)
-    ).then([server, client] {
-      return client->send_pings(server->get_addr());
-    }).then([] {
-      return seastar::sleep(100ms);
-    }).then([client] {
-      logger().info("client shutdown...");
-      return client->shutdown();
-    }).finally([server] {
-      logger().info("server shutdown...");
-      return server->shutdown();
-    }).finally([] {
-      logger().info("test_preemptive_shutdown() done!\n");
-    });
+    server->init(entity_name_t::OSD(6), "server4", 7, addr),
+    client->init(entity_name_t::OSD(7), "client4", 8)
+  ).then([server, client] {
+    return client->send_pings(server->get_addr());
+  }).then([] {
+    return seastar::sleep(100ms);
+  }).then([client] {
+    logger().info("client shutdown...");
+    return client->shutdown();
+  }).finally([server] {
+    logger().info("server shutdown...");
+    return server->shutdown();
+  }).finally([server, client] {
+    logger().info("test_preemptive_shutdown() done!\n");
   });
 }
 
@@ -594,6 +466,7 @@ using crimson::net::custom_bp_t;
 using crimson::net::Dispatcher;
 using crimson::net::Interceptor;
 using crimson::net::Messenger;
+using crimson::net::MessengerRef;
 using crimson::net::SocketPolicy;
 using crimson::net::tag_bp_t;
 using ceph::net::test::cmd_t;
@@ -901,7 +774,7 @@ SocketPolicy to_socket_policy(policy_t policy) {
 
 class FailoverSuite : public Dispatcher {
   crimson::auth::DummyAuthClientServer dummy_auth;
-  Messenger& test_msgr;
+  MessengerRef test_msgr;
   const entity_addr_t test_peer_addr;
   TestInterceptor interceptor;
 
@@ -1023,12 +896,12 @@ class FailoverSuite : public Dispatcher {
 
  private:
   seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
-    test_msgr.set_default_policy(policy);
-    test_msgr.set_auth_client(&dummy_auth);
-    test_msgr.set_auth_server(&dummy_auth);
-    test_msgr.interceptor = &interceptor;
-    return test_msgr.bind(entity_addrvec_t{addr}).then([this] {
-      return test_msgr.start(this);
+    test_msgr->set_default_policy(policy);
+    test_msgr->set_auth_client(&dummy_auth);
+    test_msgr->set_auth_server(&dummy_auth);
+    test_msgr->interceptor = &interceptor;
+    return test_msgr->bind(entity_addrvec_t{addr}).then([this] {
+      return test_msgr->start(this);
     });
   }
 
@@ -1137,7 +1010,7 @@ class FailoverSuite : public Dispatcher {
 
  // called by FailoverTest
  public:
-  FailoverSuite(Messenger& test_msgr,
+  FailoverSuite(MessengerRef test_msgr,
                 entity_addr_t test_peer_addr,
                 const TestInterceptor& interceptor)
     : test_msgr(test_msgr),
@@ -1145,11 +1018,11 @@ class FailoverSuite : public Dispatcher {
       interceptor(interceptor) { }
 
   entity_addr_t get_addr() const {
-    return test_msgr.get_myaddr();
+    return test_msgr->get_myaddr();
   }
 
   seastar::future<> shutdown() {
-    return test_msgr.shutdown();
+    return test_msgr->shutdown();
   }
 
   void needs_receive() {
@@ -1193,17 +1066,12 @@ class FailoverSuite : public Dispatcher {
          SocketPolicy test_policy,
          entity_addr_t test_peer_addr,
          const TestInterceptor& interceptor) {
-    return Messenger::create(entity_name_t::OSD(2), "Test", 2, 0
-    ).then([test_addr,
-            test_policy,
-            test_peer_addr,
-            interceptor] (Messenger* test_msgr) {
-      auto suite = std::make_unique<FailoverSuite>(
-          *test_msgr, test_peer_addr, interceptor);
-      return suite->init(test_addr, test_policy
-      ).then([suite = std::move(suite)] () mutable {
-        return std::move(suite);
-      });
+    auto suite = std::make_unique<FailoverSuite>(
+        Messenger::create(entity_name_t::OSD(2), "Test", 2),
+        test_peer_addr, interceptor);
+    return suite->init(test_addr, test_policy
+    ).then([suite = std::move(suite)] () mutable {
+      return std::move(suite);
     });
   }
 
@@ -1211,9 +1079,8 @@ class FailoverSuite : public Dispatcher {
  public:
   seastar::future<> connect_peer() {
     logger().info("[Test] connect_peer({})", test_peer_addr);
-    return test_msgr.connect(test_peer_addr, entity_name_t::TYPE_OSD
-    ).then([this] (auto xconn) {
-      auto conn = xconn->release();
+    return test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD
+    ).then([this] (auto conn) {
       auto result = interceptor.find_result(conn);
       ceph_assert(result != nullptr);
 
@@ -1302,7 +1169,7 @@ class FailoverSuite : public Dispatcher {
 
 class FailoverTest : public Dispatcher {
   crimson::auth::DummyAuthClientServer dummy_auth;
-  Messenger& cmd_msgr;
+  MessengerRef cmd_msgr;
   ConnectionRef cmd_conn;
   const entity_addr_t test_addr;
   const entity_addr_t test_peer_addr;
@@ -1375,20 +1242,20 @@ class FailoverTest : public Dispatcher {
   }
 
   seastar::future<> init(entity_addr_t cmd_peer_addr) {
-    cmd_msgr.set_default_policy(SocketPolicy::lossy_client(0));
-    cmd_msgr.set_auth_client(&dummy_auth);
-    cmd_msgr.set_auth_server(&dummy_auth);
-    return cmd_msgr.start(this).then([this, cmd_peer_addr] {
+    cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0));
+    cmd_msgr->set_auth_client(&dummy_auth);
+    cmd_msgr->set_auth_server(&dummy_auth);
+    return cmd_msgr->start(this).then([this, cmd_peer_addr] {
       logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
-      return cmd_msgr.connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
+      return cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
     }).then([this] (auto conn) {
-      cmd_conn = conn->release();
+      cmd_conn = conn;
       return pingpong();
     });
   }
 
  public:
-  FailoverTest(Messenger& cmd_msgr,
+  FailoverTest(MessengerRef cmd_msgr,
                entity_addr_t test_addr,
                entity_addr_t test_peer_addr)
     : cmd_msgr(cmd_msgr),
@@ -1403,25 +1270,23 @@ class FailoverTest : public Dispatcher {
     return cmd_conn->send(m).then([this] {
       return seastar::sleep(200ms);
     }).finally([this] {
-      return cmd_msgr.shutdown();
+      return cmd_msgr->shutdown();
     });
   }
 
   static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
   create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
-    return Messenger::create(entity_name_t::OSD(1), "CmdCli", 1, 0
-    ).then([cmd_peer_addr, test_addr] (Messenger* cmd_msgr) mutable {
-      test_addr.set_nonce(2);
-      cmd_peer_addr.set_nonce(3);
-      entity_addr_t test_peer_addr = cmd_peer_addr;
-      test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
-      test_peer_addr.set_nonce(4);
-      auto test = seastar::make_lw_shared<FailoverTest>(
-          *cmd_msgr, test_addr, test_peer_addr);
-      return test->init(cmd_peer_addr).then([test] {
-        logger().info("CmdCli ready");
-        return test;
-      });
+    test_addr.set_nonce(2);
+    cmd_peer_addr.set_nonce(3);
+    entity_addr_t test_peer_addr = cmd_peer_addr;
+    test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+    test_peer_addr.set_nonce(4);
+    auto test = seastar::make_lw_shared<FailoverTest>(
+        Messenger::create(entity_name_t::OSD(1), "CmdCli", 1),
+        test_addr, test_peer_addr);
+    return test->init(cmd_peer_addr).then([test] {
+      logger().info("CmdCli ready");
+      return test;
     });
   }
 
@@ -1506,7 +1371,7 @@ class FailoverTest : public Dispatcher {
 class FailoverSuitePeer : public Dispatcher {
   using cb_t = std::function<seastar::future<>()>;
   crimson::auth::DummyAuthClientServer dummy_auth;
-  Messenger& peer_msgr;
+  MessengerRef peer_msgr;
   cb_t op_callback;
 
   ConnectionRef tracked_conn;
@@ -1535,11 +1400,11 @@ class FailoverSuitePeer : public Dispatcher {
 
  private:
   seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
-    peer_msgr.set_default_policy(policy);
-    peer_msgr.set_auth_client(&dummy_auth);
-    peer_msgr.set_auth_server(&dummy_auth);
-    return peer_msgr.bind(entity_addrvec_t{addr}).then([this] {
-      return peer_msgr.start(this);
+    peer_msgr->set_default_policy(policy);
+    peer_msgr->set_auth_client(&dummy_auth);
+    peer_msgr->set_auth_server(&dummy_auth);
+    return peer_msgr->bind(entity_addrvec_t{addr}).then([this] {
+      return peer_msgr->start(this);
     });
   }
 
@@ -1567,18 +1432,18 @@ class FailoverSuitePeer : public Dispatcher {
   }
 
  public:
-  FailoverSuitePeer(Messenger& peer_msgr, cb_t op_callback)
+  FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
     : peer_msgr(peer_msgr), op_callback(op_callback) { }
 
   seastar::future<> shutdown() {
-    return peer_msgr.shutdown();
+    return peer_msgr->shutdown();
   }
 
   seastar::future<> connect_peer(entity_addr_t addr) {
     logger().info("[TestPeer] connect_peer({})", addr);
-    return peer_msgr.connect(addr, entity_name_t::TYPE_OSD
-    ).then([this] (auto xconn) {
-      auto new_tracked_conn = xconn->release();
+    return peer_msgr->connect(addr, entity_name_t::TYPE_OSD
+    ).then([this] (auto conn) {
+      auto new_tracked_conn = conn;
       if (tracked_conn) {
         if (tracked_conn->is_closed()) {
           ceph_assert(tracked_conn != new_tracked_conn);
@@ -1621,20 +1486,18 @@ class FailoverSuitePeer : public Dispatcher {
 
   static seastar::future<std::unique_ptr<FailoverSuitePeer>>
   create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) {
-    return Messenger::create(entity_name_t::OSD(4), "TestPeer", 4, 0
-    ).then([addr, policy, op_callback] (Messenger* peer_msgr) {
-      auto suite = std::make_unique<FailoverSuitePeer>(*peer_msgr, op_callback);
-      return suite->init(addr, policy
-      ).then([suite = std::move(suite)] () mutable {
-        return std::move(suite);
-      });
+    auto suite = std::make_unique<FailoverSuitePeer>(
+        Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback);
+    return suite->init(addr, policy
+    ).then([suite = std::move(suite)] () mutable {
+      return std::move(suite);
     });
   }
 };
 
 class FailoverTestPeer : public Dispatcher {
   crimson::auth::DummyAuthClientServer dummy_auth;
-  Messenger& cmd_msgr;
+  MessengerRef cmd_msgr;
   ConnectionRef cmd_conn;
   const entity_addr_t test_peer_addr;
   std::unique_ptr<FailoverSuitePeer> test_suite;
@@ -1650,7 +1513,7 @@ class FailoverTestPeer : public Dispatcher {
       if (cmd == cmd_t::shutdown) {
         logger().info("CmdSrv shutdown...");
         // forwarded to FailoverTestPeer::wait()
-        (void) cmd_msgr.shutdown();
+        (void) cmd_msgr->shutdown();
         return seastar::now();
       }
       return handle_cmd(cmd, m_cmd).then([c] {
@@ -1715,37 +1578,35 @@ class FailoverTestPeer : public Dispatcher {
   }
 
   seastar::future<> init(entity_addr_t cmd_peer_addr) {
-    cmd_msgr.set_default_policy(SocketPolicy::stateless_server(0));
-    cmd_msgr.set_auth_client(&dummy_auth);
-    cmd_msgr.set_auth_server(&dummy_auth);
-    return cmd_msgr.bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
-      return cmd_msgr.start(this);
+    cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+    cmd_msgr->set_auth_client(&dummy_auth);
+    cmd_msgr->set_auth_server(&dummy_auth);
+    return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
+      return cmd_msgr->start(this);
     });
   }
 
  public:
-  FailoverTestPeer(Messenger& cmd_msgr,
+  FailoverTestPeer(MessengerRef cmd_msgr,
                    entity_addr_t test_peer_addr)
     : cmd_msgr(cmd_msgr),
       test_peer_addr(test_peer_addr) { }
 
   seastar::future<> wait() {
-    return cmd_msgr.wait();
+    return cmd_msgr->wait();
   }
 
   static seastar::future<std::unique_ptr<FailoverTestPeer>>
   create(entity_addr_t cmd_peer_addr) {
-    return Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3, 0
-    ).then([cmd_peer_addr] (Messenger* cmd_msgr) {
-      // suite bind to cmd_peer_addr, with port + 1
-      entity_addr_t test_peer_addr = cmd_peer_addr;
-      test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
-      auto test_peer = std::make_unique<FailoverTestPeer>(*cmd_msgr, test_peer_addr);
-      return test_peer->init(cmd_peer_addr
-      ).then([test_peer = std::move(test_peer)] () mutable {
-        logger().info("CmdSrv ready");
-        return std::move(test_peer);
-      });
+    // suite bind to cmd_peer_addr, with port + 1
+    entity_addr_t test_peer_addr = cmd_peer_addr;
+    test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+    auto test_peer = std::make_unique<FailoverTestPeer>(
+        Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr);
+    return test_peer->init(cmd_peer_addr
+    ).then([test_peer = std::move(test_peer)] () mutable {
+      logger().info("CmdSrv ready");
+      return std::move(test_peer);
     });
   }
 };
index a733d7de864c7ccb7a4830c5ccaec5580241edc3..1275b451ebe76623a59602b9d11370de3b8fc138 100644 (file)
@@ -39,29 +39,26 @@ static seastar::future<> test_monc()
   }).then([] {
     return crimson::common::sharded_perf_coll().start();
   }).then([] {
-    return crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0,
-                                        seastar::engine().cpu_id())
-        .then([] (crimson::net::Messenger *msgr) {
-      auto& conf = crimson::common::local_conf();
-      if (conf->ms_crc_data) {
-        msgr->set_crc_data();
-      }
-      if (conf->ms_crc_header) {
-        msgr->set_crc_header();
-      }
-      msgr->set_require_authorizer(false);
-      return seastar::do_with(MonClient{*msgr, dummy_handler},
-                              [msgr](auto& monc) {
-        return msgr->start(&monc).then([&monc] {
-          return seastar::with_timeout(
-            seastar::lowres_clock::now() + std::chrono::seconds{10},
-            monc.start());
-        }).then([&monc] {
-          return monc.stop();
-        });
-      }).finally([msgr] {
-        return msgr->shutdown();
+    auto msgr = crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0);
+    auto& conf = crimson::common::local_conf();
+    if (conf->ms_crc_data) {
+      msgr->set_crc_data();
+    }
+    if (conf->ms_crc_header) {
+      msgr->set_crc_header();
+    }
+    msgr->set_require_authorizer(false);
+    return seastar::do_with(MonClient{*msgr, dummy_handler},
+                            [msgr](auto& monc) {
+      return msgr->start(&monc).then([&monc] {
+        return seastar::with_timeout(
+          seastar::lowres_clock::now() + std::chrono::seconds{10},
+          monc.start());
+      }).then([&monc] {
+        return monc.stop();
       });
+    }).finally([msgr] {
+      return msgr->shutdown();
     });
   }).finally([] {
     return crimson::common::sharded_perf_coll().stop().then([] {
index e66f8702fb39bea9b0ea3bcc0538fb2424ac028c..0c24f3b24c8bb6f2c2ad66f92f4ee60f7e62de7d 100644 (file)
@@ -11,6 +11,7 @@
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sleep.hh>
 #include <seastar/core/semaphore.hh>
+#include <seastar/core/smp.hh>
 
 #include "common/ceph_time.h"
 #include "messages/MOSDOp.h"
@@ -32,6 +33,23 @@ seastar::logger& logger() {
   return crimson::get_logger(ceph_subsys_ms);
 }
 
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+  // seems we should only construct/stop shards on #0
+  return seastar::smp::submit_to(0, [=] {
+    auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+    return sharded_obj->start(args...).then([sharded_obj]() {
+      seastar::engine().at_exit([sharded_obj]() {
+          return sharded_obj->stop().finally([sharded_obj] {});
+        });
+      return sharded_obj.get();
+    });
+  }).then([] (seastar::sharded<T> *ptr_shard) {
+    // return the pointer valid for the caller CPU
+    return &ptr_shard->local();
+  });
+}
+
 enum class perf_mode_t {
   both,
   client,
@@ -114,32 +132,26 @@ static seastar::future<> run(
     const server_config& server_conf)
 {
   struct test_state {
+    struct Server;
+    using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
+
     struct Server final
-        : public crimson::net::Dispatcher,
-          public seastar::peering_sharded_service<Server> {
-      crimson::net::Messenger *msgr = nullptr;
+        : public crimson::net::Dispatcher {
+      crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
-      const seastar::shard_id sid;
       const seastar::shard_id msgr_sid;
       std::string lname;
       unsigned msg_len;
       bufferlist msg_data;
 
-      Server(unsigned msgr_core, unsigned msg_len)
-        : sid{seastar::engine().cpu_id()},
-          msgr_sid{msgr_core},
+      Server(unsigned msg_len)
+        : msgr_sid{seastar::engine().cpu_id()},
           msg_len{msg_len} {
         lname = "server#";
-        lname += std::to_string(sid);
+        lname += std::to_string(msgr_sid);
         msg_data.append_zero(msg_len);
       }
 
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::make_ready_future<>();
-      }
       seastar::future<> ms_dispatch(crimson::net::Connection* c,
                                     MessageRef m) override {
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
@@ -158,34 +170,38 @@ static seastar::future<> run(
       }
 
       seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
-        return container().invoke_on(msgr_sid, [v1_crc_enabled, addr] (auto& server) {
+        return seastar::smp::submit_to(msgr_sid, [v1_crc_enabled, addr, this] {
           // server msgr is always with nonce 0
-          auto&& fut = crimson::net::Messenger::create(entity_name_t::OSD(server.sid), server.lname, 0, server.sid);
-          return fut.then(
-            [&server, addr, v1_crc_enabled](crimson::net::Messenger *messenger) {
-              return server.container().invoke_on_all(
-                [messenger, v1_crc_enabled](auto& server) {
-                  server.msgr = messenger->get_local_shard();
-                  server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-                  server.msgr->set_auth_client(&server.dummy_auth);
-                  server.msgr->set_auth_server(&server.dummy_auth);
-                  if (v1_crc_enabled) {
-                    server.msgr->set_crc_header();
-                    server.msgr->set_crc_data();
-                  }
-                }).then([messenger, addr] {
-                  return messenger->bind(entity_addrvec_t{addr});
-                }).then([&server, messenger] {
-                  return messenger->start(&server);
-                });
-            });
+          msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
+          msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+          msgr->set_auth_client(&dummy_auth);
+          msgr->set_auth_server(&dummy_auth);
+          if (v1_crc_enabled) {
+            msgr->set_crc_header();
+            msgr->set_crc_data();
+          }
+          return msgr->bind(entity_addrvec_t{addr}).then([this] {
+            return msgr->start(this);
+          });
         });
       }
       seastar::future<> shutdown() {
         logger().info("{} shutdown...", lname);
-        return container().invoke_on(msgr_sid, [] (auto& server) {
-          ceph_assert(server.msgr);
-          return server.msgr->shutdown();
+        return seastar::smp::submit_to(msgr_sid, [this] {
+          ceph_assert(msgr);
+          return msgr->shutdown();
+        });
+      }
+      seastar::future<> wait() {
+        return seastar::smp::submit_to(msgr_sid, [this] {
+          ceph_assert(msgr);
+          return msgr->wait();
+        });
+      }
+
+      static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
+        return seastar::smp::submit_to(msgr_sid, [msg_len] {
+          return seastar::make_foreign(std::make_unique<Server>(msg_len));
         });
       }
     };
@@ -250,7 +266,7 @@ static seastar::future<> run(
       std::string lname;
 
       const unsigned jobs;
-      crimson::net::Messenger *msgr = nullptr;
+      crimson::net::MessengerRef msgr;
       const unsigned msg_len;
       bufferlist msg_data;
       const unsigned nr_depth;
@@ -281,12 +297,6 @@ static seastar::future<> run(
         return nr_depth - depth.current();
       }
 
-      Dispatcher* get_local_shard() override {
-        return &(container().local());
-      }
-      seastar::future<> stop() {
-        return seastar::now();
-      }
       seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override {
         conn_stats.connected_time = mono_clock::now();
         return seastar::now();
@@ -323,19 +333,16 @@ static seastar::future<> run(
       seastar::future<> init(bool v1_crc_enabled) {
         return container().invoke_on_all([v1_crc_enabled] (auto& client) {
           if (client.is_active()) {
-            return crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid, client.sid)
-            .then([&client, v1_crc_enabled] (crimson::net::Messenger *messenger) {
-              client.msgr = messenger;
-              client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-              client.msgr->set_require_authorizer(false);
-              client.msgr->set_auth_client(&client.dummy_auth);
-              client.msgr->set_auth_server(&client.dummy_auth);
-              if (v1_crc_enabled) {
-                client.msgr->set_crc_header();
-                client.msgr->set_crc_data();
-              }
-              return client.msgr->start(&client);
-            });
+            client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
+            client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+            client.msgr->set_require_authorizer(false);
+            client.msgr->set_auth_client(&client.dummy_auth);
+            client.msgr->set_auth_server(&client.dummy_auth);
+            if (v1_crc_enabled) {
+              client.msgr->set_crc_header();
+              client.msgr->set_crc_data();
+            }
+            return client.msgr->start(&client);
           }
           return seastar::now();
         });
@@ -359,9 +366,9 @@ static seastar::future<> run(
           // start clients in active cores (#1 ~ #jobs)
           if (client.is_active()) {
             mono_time start_time = mono_clock::now();
-            return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
-            .then([&client] (auto conn) {
-              client.active_conn = conn->release();
+            return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD
+            ).then([&client] (auto conn) {
+              client.active_conn = conn;
               // make sure handshake won't hurt the performance
               return seastar::sleep(1s);
             }).then([&client, start_time] {
@@ -638,57 +645,57 @@ static seastar::future<> run(
   };
 
   return seastar::when_all_succeed(
-      crimson::net::create_sharded<test_state::Server>(server_conf.core, server_conf.block_size),
-      crimson::net::create_sharded<test_state::Client>(client_conf.jobs,
-                                                      client_conf.block_size, client_conf.depth))
-    .then([=](test_state::Server *server,
-              test_state::Client *client) {
-      if (mode == perf_mode_t::both) {
-          logger().info("\nperf settings:\n  {}\n  {}\n",
-                        client_conf.str(), server_conf.str());
-          ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
-          ceph_assert(client_conf.jobs > 0);
-          ceph_assert(seastar::smp::count >= 1+server_conf.core);
-          ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
-          return seastar::when_all_succeed(
-              server->init(server_conf.v1_crc_enabled, server_conf.addr),
-              client->init(client_conf.v1_crc_enabled))
-            .then([client, addr = client_conf.server_addr] {
-              return client->connect_wait_verify(addr);
-            }).then([client, ramptime = client_conf.ramptime,
-                     msgtime = client_conf.msgtime] {
-              return client->dispatch_with_timer(ramptime, msgtime);
-            }).finally([client] {
-              return client->shutdown();
-            }).finally([server] {
-              return server->shutdown();
-            });
-      } else if (mode == perf_mode_t::client) {
-          logger().info("\nperf settings:\n  {}\n", client_conf.str());
-          ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
-          ceph_assert(client_conf.jobs > 0);
-          return client->init(client_conf.v1_crc_enabled)
-            .then([client, addr = client_conf.server_addr] {
-              return client->connect_wait_verify(addr);
-            }).then([client, ramptime = client_conf.ramptime,
-                     msgtime = client_conf.msgtime] {
-              return client->dispatch_with_timer(ramptime, msgtime);
-            }).finally([client] {
-              return client->shutdown();
-            });
-      } else { // mode == perf_mode_t::server
-          ceph_assert(seastar::smp::count >= 1+server_conf.core);
-          logger().info("\nperf settings:\n  {}\n", server_conf.str());
-          return server->init(server_conf.v1_crc_enabled, server_conf.addr)
-          // dispatch ops
-            .then([server] {
-              return server->msgr->wait();
-          // shutdown
-            }).finally([server] {
-              return server->shutdown();
-            });
-      }
-    });
+      test_state::Server::create(server_conf.core, server_conf.block_size),
+      create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth)
+  ).then([=](test_state::ServerFRef fp_server,
+             test_state::Client *client) {
+    test_state::Server* server = fp_server.get();
+    if (mode == perf_mode_t::both) {
+      logger().info("\nperf settings:\n  {}\n  {}\n",
+                    client_conf.str(), server_conf.str());
+      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
+      ceph_assert(client_conf.jobs > 0);
+      ceph_assert(seastar::smp::count >= 1+server_conf.core);
+      ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
+      return seastar::when_all_succeed(
+        server->init(server_conf.v1_crc_enabled, server_conf.addr),
+        client->init(client_conf.v1_crc_enabled)
+      ).then([client, addr = client_conf.server_addr] {
+        return client->connect_wait_verify(addr);
+      }).then([client, ramptime = client_conf.ramptime,
+               msgtime = client_conf.msgtime] {
+        return client->dispatch_with_timer(ramptime, msgtime);
+      }).finally([client] {
+        return client->shutdown();
+      }).finally([server, fp_server = std::move(fp_server)] () mutable {
+        return server->shutdown().then([cleanup = std::move(fp_server)] {});
+      });
+    } else if (mode == perf_mode_t::client) {
+      logger().info("\nperf settings:\n  {}\n", client_conf.str());
+      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
+      ceph_assert(client_conf.jobs > 0);
+      return client->init(client_conf.v1_crc_enabled
+      ).then([client, addr = client_conf.server_addr] {
+        return client->connect_wait_verify(addr);
+      }).then([client, ramptime = client_conf.ramptime,
+               msgtime = client_conf.msgtime] {
+        return client->dispatch_with_timer(ramptime, msgtime);
+      }).finally([client] {
+        return client->shutdown();
+      });
+    } else { // mode == perf_mode_t::server
+      ceph_assert(seastar::smp::count >= 1+server_conf.core);
+      logger().info("\nperf settings:\n  {}\n", server_conf.str());
+      return server->init(server_conf.v1_crc_enabled, server_conf.addr
+      // dispatch ops
+      ).then([server] {
+        return server->wait();
+      // shutdown
+      }).finally([server, fp_server = std::move(fp_server)] () mutable {
+        return server->shutdown().then([cleanup = std::move(fp_server)] {});
+      });
+    }
+  });
 }
 
 }