crimson/net: port sharded-msgr to existing code 24945/head
authorYingxin Cheng <yingxincheng@gmail.com>
Sun, 3 Feb 2019 14:24:23 +0000 (22:24 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Tue, 12 Feb 2019 08:48:03 +0000 (16:48 +0800)
Port sharded-msgr to crimson osd, monc, heartbeat and tests with
compatible mode.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/CMakeLists.txt
src/crimson/mon/MonClient.cc
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/test/crimson/CMakeLists.txt
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_monc.cc

index 3294d634f6e46b6bfd032b96e2853ac471c25e31..5d42f60f24bb0e45e212a67d90fcd5e82af026c9 100644 (file)
@@ -125,8 +125,7 @@ set(crimson_thread_srcs
   thread/Throttle.cc)
 add_library(crimson STATIC
   ${crimson_auth_srcs}
-  # TODO: fix crimson_mon_client with the new design
-  # ${crimson_mon_srcs}
+  ${crimson_mon_srcs}
   ${crimson_net_srcs}
   ${crimson_thread_srcs}
   ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc)
index 0ce65a5cfc79a58db5844fd7464d5fe8869e8b75..4b61270c271f22853d099627ed9937142b027b0c 100644 (file)
@@ -512,13 +512,19 @@ seastar::future<> Client::reopen_session(int rank)
 #warning fixme
     auto peer = monmap.get_addrs(rank).legacy_addr();
     logger().info("connecting to mon.{}", rank);
-    auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
-    auto& mc = pending_conns.emplace_back(conn, &keyring);
-    return mc.authenticate(
-      monmap.get_epoch(), entity_name,
-      *auth_methods, want_keys).handle_exception([conn](auto ep) {
-      return conn->close().then([ep = std::move(ep)] {
-        std::rethrow_exception(ep);
+    return msgr.connect(peer, CEPH_ENTITY_TYPE_MON)
+    .then([this] (auto xconn) {
+      // sharded-messenger compatible mode assumes all connections running
+      // in one shard.
+      ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
+      ceph::net::ConnectionRef conn = xconn->release();
+      auto& mc = pending_conns.emplace_back(conn, &keyring);
+      return mc.authenticate(
+        monmap.get_epoch(), entity_name,
+        *auth_methods, want_keys).handle_exception([conn](auto ep) {
+        return conn->close().then([ep = std::move(ep)] {
+          std::rethrow_exception(ep);
+        });
       });
     }).then([peer, this] {
       if (!is_hunting()) {
index c075ad6867dabffeaff6eaf386126e47459463d8..0d818b24f54d9bc589f9d015dc317985a31f7060 100644 (file)
@@ -7,7 +7,7 @@
 
 #include "crimson/common/config_proxy.h"
 #include "crimson/net/Connection.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
 #include "crimson/osd/osdmap_service.h"
 #include "crimson/mon/MonClient.h"
 
@@ -31,10 +31,8 @@ Heartbeat::Heartbeat(int whoami,
                      uint32_t nonce,
                      const OSDMapService& service,
                      ceph::mon::Client& monc)
-  : front_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
-                                              "hb_front", nonce}},
-    back_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
-                                             "hb_back", nonce}},
+  : whoami{whoami},
+    nonce{nonce},
     service{service},
     monc{monc},
     timer{[this] {send_heartbeats();}}
@@ -48,17 +46,31 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
   for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
     addr.set_port(0);
   }
-  front_msgr->try_bind(front_addrs,
-                       local_conf()->ms_bind_port_min,
-                       local_conf()->ms_bind_port_max);
-  back_msgr->try_bind(front_addrs,
-                      local_conf()->ms_bind_port_min,
-                      local_conf()->ms_bind_port_max);
-  return seastar::when_all_succeed(front_msgr->start(this),
-                                   back_msgr->start(this)).then([this] {
-    timer.arm_periodic(
-      std::chrono::seconds(local_conf()->osd_heartbeat_interval));
-  });
+  return seastar::when_all_succeed(
+      ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+                                   "hb_front",
+                                   nonce,
+                                   seastar::engine().cpu_id())
+        .then([this, front_addrs] (auto msgr) {
+          front_msgr = msgr;
+          return front_msgr->try_bind(front_addrs,
+                                      local_conf()->ms_bind_port_min,
+                                      local_conf()->ms_bind_port_max);
+        }).then([this] { return front_msgr->start(this); }),
+      ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+                                   "hb_back",
+                                   nonce,
+                                   seastar::engine().cpu_id())
+        .then([this, back_addrs] (auto msgr) {
+          back_msgr = msgr;
+          return back_msgr->try_bind(back_addrs,
+                                     local_conf()->ms_bind_port_min,
+                                     local_conf()->ms_bind_port_max);
+        }).then([this] { return back_msgr->start(this); }))
+    .then([this] {
+      timer.arm_periodic(
+        std::chrono::seconds(local_conf()->osd_heartbeat_interval));
+    });
 }
 
 seastar::future<> Heartbeat::stop()
@@ -77,24 +89,29 @@ const entity_addrvec_t& Heartbeat::get_back_addrs() const
   return back_msgr->get_myaddrs();
 }
 
-void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
 {
   auto found = peers.find(peer);
   if (found == peers.end()) {
     logger().info("add_peer({})", peer);
-    PeerInfo info;
     auto osdmap = service.get_map();
     // TODO: msgr v2
-    info.con_front =
-      front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
-                          CEPH_ENTITY_TYPE_OSD);
-    info.con_back =
-      back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
-                         CEPH_ENTITY_TYPE_OSD);
-    info.epoch = epoch;
-    peers.emplace(peer, std::move(info));
+    return seastar::when_all_succeed(
+        front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
+                            CEPH_ENTITY_TYPE_OSD),
+        back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
+                           CEPH_ENTITY_TYPE_OSD))
+      .then([this, peer, epoch] (auto xcon_front, auto xcon_back) {
+        PeerInfo info;
+        // sharded-messenger compatible mode
+        info.con_front = xcon_front->release();
+        info.con_back = xcon_back->release();
+        info.epoch = epoch;
+        peers.emplace(peer, std::move(info));
+      });
   } else {
     found->second.epoch = epoch;
+    return seastar::now();
   }
 }
 
index 916acba2c9a41afa14f8e3705699d7077763ed3a..209914e78d6433aaa91f2554a0e9bd9a735e3afb 100644 (file)
@@ -31,7 +31,7 @@ public:
                          entity_addrvec_t back);
   seastar::future<> stop();
 
-  void add_peer(osd_id_t peer, epoch_t epoch);
+  seastar::future<> add_peer(osd_id_t peer, epoch_t epoch);
   seastar::future<> update_peers(int whoami);
   seastar::future<> remove_peer(osd_id_t peer);
 
@@ -64,8 +64,10 @@ private:
   void add_reporter_peers(int whoami);
 
 private:
-  std::unique_ptr<ceph::net::Messenger> front_msgr;
-  std::unique_ptr<ceph::net::Messenger> back_msgr;
+  const int whoami;
+  const uint32_t nonce;
+  ceph::net::Messenger* front_msgr = nullptr;
+  ceph::net::Messenger* back_msgr = nullptr;
   const OSDMapService& service;
   ceph::mon::Client& monc;
 
index 24f7de8bbd5d3e355c1909125819f5330f09af0a..2b8d8a8d99550a35956366637436e96d252bfc51 100644 (file)
@@ -7,7 +7,7 @@
 #include "messages/MOSDBoot.h"
 #include "messages/MOSDMap.h"
 #include "crimson/net/Connection.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
 #include "crimson/os/cyan_collection.h"
 #include "crimson/os/cyan_object.h"
 #include "crimson/os/cyan_store.h"
@@ -35,29 +35,8 @@ using ceph::os::CyanStore;
 
 OSD::OSD(int id, uint32_t nonce)
   : whoami{id},
-    cluster_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
-                                                "cluster", nonce}},
-    public_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
-                                               "client", nonce}},
-    monc{*public_msgr},
-    heartbeat{new Heartbeat{whoami, nonce, *this, monc}},
-    heartbeat_timer{[this] { update_heartbeat_peers(); }}
-{
-  for (auto msgr : {cluster_msgr.get(), public_msgr.get()}) {
-    if (local_conf()->ms_crc_data) {
-      msgr->set_crc_data();
-    }
-    if (local_conf()->ms_crc_header) {
-      msgr->set_crc_header();
-    }
-  }
-  dispatchers.push_front(this);
-  dispatchers.push_front(&monc);
-  osdmaps[0] = seastar::make_lw_shared<OSDMap>();
-  beacon_timer.set_callback([this] {
-    send_beacon();
-  });
-}
+    nonce{nonce}
+{}
 
 OSD::~OSD() = default;
 
@@ -131,9 +110,38 @@ namespace {
 seastar::future<> OSD::start()
 {
   logger().info("start");
-  const auto data_path = local_conf().get_val<std::string>("osd_data");
-  store = std::make_unique<ceph::os::CyanStore>(data_path);
-  return store->mount().then([this] {
+
+  return seastar::when_all_succeed(
+    ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+                                 "cluster",
+                                 nonce,
+                                 seastar::engine().cpu_id())
+      .then([this] (auto msgr) { cluster_msgr = msgr; }),
+    ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+                                 "client",
+                                 nonce,
+                                 seastar::engine().cpu_id())
+      .then([this] (auto msgr) { public_msgr = msgr; }))
+  .then([this] {
+    monc.reset(new ceph::mon::Client{*public_msgr});
+    heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});
+
+    for (auto msgr : {cluster_msgr, public_msgr}) {
+      if (local_conf()->ms_crc_data) {
+        msgr->set_crc_data();
+      }
+      if (local_conf()->ms_crc_header) {
+        msgr->set_crc_header();
+      }
+    }
+    dispatchers.push_front(this);
+    dispatchers.push_front(monc.get());
+    osdmaps[0] = seastar::make_lw_shared<OSDMap>();
+
+    const auto data_path = local_conf().get_val<std::string>("osd_data");
+    store = std::make_unique<ceph::os::CyanStore>(data_path);
+    return store->mount();
+  }).then([this] {
     meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
                                      store.get());
     return meta_coll->load_superblock();
@@ -144,25 +152,28 @@ seastar::future<> OSD::start()
     osdmap = std::move(map);
     return load_pgs();
   }).then([this] {
-    cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
-                           local_conf()->ms_bind_port_min,
-                           local_conf()->ms_bind_port_max);
-    public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
-                          local_conf()->ms_bind_port_min,
-                          local_conf()->ms_bind_port_max);
-    return seastar::when_all_succeed(cluster_msgr->start(&dispatchers),
-                                     public_msgr->start(&dispatchers));
+    return seastar::when_all_succeed(
+      cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+                             local_conf()->ms_bind_port_min,
+                             local_conf()->ms_bind_port_max)
+        .then([this] { return cluster_msgr->start(&dispatchers); }),
+      public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+                            local_conf()->ms_bind_port_min,
+                            local_conf()->ms_bind_port_max)
+        .then([this] { return public_msgr->start(&dispatchers); }));
   }).then([this] {
-    return monc.start();
+    return monc->start();
   }).then([this] {
-    monc.sub_want("osd_pg_creates", last_pg_create_epoch, 0);
-    monc.sub_want("mgrmap", 0, 0);
-    monc.sub_want("osdmap", 0, 0);
-    return monc.renew_subs();
+    monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
+    monc->sub_want("mgrmap", 0, 0);
+    monc->sub_want("osdmap", 0, 0);
+    return monc->renew_subs();
   }).then([this] {
     return heartbeat->start(public_msgr->get_myaddrs(),
                             cluster_msgr->get_myaddrs());
   }).then([this] {
+    beacon_timer.set_callback([this] { send_beacon(); });
+    heartbeat_timer.set_callback([this] { update_heartbeat_peers(); });
     return start_boot();
   });
 }
@@ -170,7 +181,7 @@ seastar::future<> OSD::start()
 seastar::future<> OSD::start_boot()
 {
   state.set_preboot();
-  return monc.get_version("osdmap").then([this](version_t newest, version_t oldest) {
+  return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
     return _preboot(newest, oldest);
   });
 }
@@ -222,7 +233,7 @@ seastar::future<> OSD::_send_boot()
                                   heartbeat->get_front_addrs(),
                                   cluster_msgr->get_myaddrs(),
                                   CEPH_FEATURES_ALL);
-  return monc.send_message(m);
+  return monc->send_message(m);
 }
 
 seastar::future<> OSD::stop()
@@ -232,9 +243,11 @@ seastar::future<> OSD::stop()
   return gate.close().then([this] {
     return heartbeat->stop();
   }).then([this] {
-    return monc.stop();
+    return monc->stop();
   }).then([this] {
     return public_msgr->shutdown();
+  }).then([this] {
+    return cluster_msgr->shutdown();
   });
 }
 
@@ -402,9 +415,9 @@ seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
 seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
 {
   logger().info("{}({})", __func__, epoch);
-  if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+  if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
       force_request) {
-    return monc.renew_subs();
+    return monc->renew_subs();
   } else {
     return seastar::now();
   }
@@ -455,7 +468,7 @@ seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
                           [=](auto& t) {
     return store_maps(t, start, m).then([=, &t] {
       // even if this map isn't from a mon, we may have satisfied our subscription
-      monc.sub_got("osdmap", last);
+      monc->sub_got("osdmap", last);
       if (!superblock.oldest_map || skip_maps) {
         superblock.oldest_map = first;
       }
@@ -583,7 +596,7 @@ seastar::future<> OSD::send_beacon()
   epoch_t min_last_epoch_clean = osdmap->get_epoch();
   auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
                                     min_last_epoch_clean);
-  return monc.send_message(m);
+  return monc->send_message(m);
 }
 
 void OSD::update_heartbeat_peers()
index fc66666fba005cf663bf55bfccb1a7efce5c12fc..249cf9b351e0401e75e63f31234305aab6a0a5de 100644 (file)
@@ -40,12 +40,13 @@ class OSD : public ceph::net::Dispatcher,
   seastar::gate gate;
   seastar::timer<seastar::lowres_clock> beacon_timer;
   const int whoami;
+  const uint32_t nonce;
   // talk with osd
-  std::unique_ptr<ceph::net::Messenger> cluster_msgr;
+  ceph::net::Messenger* cluster_msgr = nullptr;
   // talk with client/mon/mgr
-  std::unique_ptr<ceph::net::Messenger> public_msgr;
+  ceph::net::Messenger* public_msgr = nullptr;
   ChainedDispatchers dispatchers;
-  ceph::mon::Client monc;
+  std::unique_ptr<ceph::mon::Client> monc;
 
   std::unique_ptr<Heartbeat> heartbeat;
   seastar::timer<seastar::lowres_clock> heartbeat_timer;
index d06fd341bd1446875627a93553c766b53e8a7307..1e12b3e94da956cdec9787f0d4fc9b378ab9a55e 100644 (file)
@@ -12,10 +12,9 @@ add_executable(unittest_seastar_messenger test_messenger.cc)
 add_ceph_unittest(unittest_seastar_messenger)
 target_link_libraries(unittest_seastar_messenger ceph-common crimson)
 
-# TODO: fix unittest_seastar_echo with the new design
-#add_executable(unittest_seastar_echo
-#  test_alien_echo.cc)
-#target_link_libraries(unittest_seastar_echo ceph-common global crimson)
+add_executable(unittest_seastar_echo
+  test_alien_echo.cc)
+target_link_libraries(unittest_seastar_echo ceph-common global crimson)
 
 add_executable(unittest_seastar_thread_pool
   test_thread_pool.cc)
@@ -26,10 +25,9 @@ add_executable(unittest_seastar_config
   test_config.cc)
 target_link_libraries(unittest_seastar_config crimson)
 
-# TODO: fix unittest_seastar_monc with the new design
-#add_executable(unittest_seastar_monc
-#  test_monc.cc)
-#target_link_libraries(unittest_seastar_monc crimson)
+add_executable(unittest_seastar_monc
+  test_monc.cc)
+target_link_libraries(unittest_seastar_monc crimson)
 
 add_executable(unittest_seastar_perfcounters
   test_perfcounters.cc)
index d58b2c94e6d8065a853a9eeac57fc0e260525785..74b6a9af706112a6a7bac43b075737cba3a9834b 100644 (file)
@@ -7,7 +7,7 @@
 #include "msg/Messenger.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Dispatcher.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
 #include "crimson/net/Config.h"
 #include "crimson/thread/Condition.h"
 #include "crimson/thread/Throttle.h"
@@ -39,8 +39,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer {
 
 struct Server {
   ceph::thread::Throttle byte_throttler;
-  static constexpr int64_t server_num = 0;
-  ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0};
+  ceph::net::Messenger& msgr;
   struct ServerDispatcher : ceph::net::Dispatcher {
     unsigned count = 0;
     seastar::condition_variable on_reply;
@@ -66,8 +65,9 @@ struct Server {
           new DummyAuthAuthorizer{});
     }
   } dispatcher;
-  Server()
-    : byte_throttler(ceph::net::conf.osd_client_message_size_cap)
+  Server(ceph::net::Messenger& msgr)
+    : byte_throttler(ceph::net::conf.osd_client_message_size_cap),
+      msgr{msgr}
   {
     msgr.set_crc_header();
     msgr.set_crc_data();
@@ -76,8 +76,7 @@ struct Server {
 
 struct Client {
   ceph::thread::Throttle byte_throttler;
-  static constexpr int64_t client_num = 1;
-  ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0};
+  ceph::net::Messenger& msgr;
   struct ClientDispatcher : ceph::net::Dispatcher {
     unsigned count = 0;
     seastar::condition_variable on_reply;
@@ -89,8 +88,9 @@ struct Client {
       return seastar::now();
     }
   } dispatcher;
-  Client()
-    : byte_throttler(ceph::net::conf.osd_client_message_size_cap)
+  Client(ceph::net::Messenger& msgr)
+    : byte_throttler(ceph::net::conf.osd_client_message_size_cap),
+      msgr{msgr}
   {
     msgr.set_crc_header();
     msgr.set_crc_data();
@@ -276,41 +276,50 @@ seastar_echo(SeastarContext& sc,
 {
   std::cout << "seastar/";
   if (role == echo_role::as_server) {
-    return seastar::do_with(seastar_pingpong::Server{},
-      [&addr, count](auto& server) mutable {
-        std::cout << "server listening at " << addr << std::endl;
-        // bind the server
-        server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
-                                         &server.byte_throttler);
-        server.msgr.bind(entity_addrvec_t{addr});
-        return server.msgr.start(&server.dispatcher)
-          .then([&dispatcher=server.dispatcher, count] {
-            return dispatcher.on_reply.wait([&dispatcher, count] {
-              return dispatcher.count >= count;
-            });
-          }).finally([&server] {
-            std::cout << "server shutting down" << std::endl;
-            return server.msgr.shutdown();
+    return ceph::net::Messenger::create(entity_name_t::OSD(0), "server", 0,
+                                        seastar::engine().cpu_id())
+      .then([&addr, count] (auto msgr) {
+        return seastar::do_with(seastar_pingpong::Server{*msgr},
+          [&addr, count](auto& server) mutable {
+            std::cout << "server listening at " << addr << std::endl;
+            // bind the server
+            server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
+                                             &server.byte_throttler);
+            return server.msgr.bind(entity_addrvec_t{addr})
+              .then([&server] {
+                return server.msgr.start(&server.dispatcher);
+              }).then([&dispatcher=server.dispatcher, count] {
+                return dispatcher.on_reply.wait([&dispatcher, count] {
+                  return dispatcher.count >= count;
+                });
+              }).finally([&server] {
+                std::cout << "server shutting down" << std::endl;
+                return server.msgr.shutdown();
+              });
           });
       });
   } else {
-    return seastar::do_with(seastar_pingpong::Client{},
-      [&addr, count](auto& client) {
-        std::cout << "client sending to " << addr << std::endl;
-        client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
-                                         &client.byte_throttler);
-        return client.msgr.start(&client.dispatcher)
-          .then([&] {
-            return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
-          }).then([&disp=client.dispatcher, count](ceph::net::ConnectionRef conn) {
-            return seastar::do_until(
-              [&disp,count] { return disp.count >= count; },
-              [&disp,conn] { return conn->send(MessageRef{new MPing(), false})
-                               .then([&] { return disp.on_reply.wait(); });
-            });
-          }).finally([&client] {
-            std::cout << "client shutting down" << std::endl;
-            return client.msgr.shutdown();
+    return ceph::net::Messenger::create(entity_name_t::OSD(1), "client", 1,
+                                        seastar::engine().cpu_id())
+      .then([&addr, count] (auto msgr) {
+        return seastar::do_with(seastar_pingpong::Client{*msgr},
+          [&addr, count](auto& client) {
+            std::cout << "client sending to " << addr << std::endl;
+            client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
+                                             &client.byte_throttler);
+            return client.msgr.start(&client.dispatcher)
+              .then([&] {
+                return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
+              }).then([&disp=client.dispatcher, count](ceph::net::ConnectionXRef conn) {
+                return seastar::do_until(
+                  [&disp,count] { return disp.count >= count; },
+                  [&disp,conn] { return (*conn)->send(MessageRef{new MPing(), false})
+                                   .then([&] { return disp.on_reply.wait(); });
+                });
+              }).finally([&client] {
+                std::cout << "client shutting down" << std::endl;
+                return client.msgr.shutdown();
+              });
           });
       });
   }
index c6c5f548a02220065e6783e72dd7b3fde86bd4b2..671aa644ff71768a71e6ce27deea4e4ef850a061 100644 (file)
@@ -3,7 +3,7 @@
 #include "crimson/common/config_proxy.h"
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Connection.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
 
 using Config = ceph::common::ConfigProxy;
 using MonClient = ceph::mon::Client;
@@ -25,26 +25,27 @@ static seastar::future<> test_monc()
   }).then([] {
     return ceph::common::sharded_perf_coll().start();
   }).then([] {
-    return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0},
-                            [](ceph::net::Messenger& msgr) {
+    return ceph::net::Messenger::create(entity_name_t::OSD(0), "monc", 0,
+                                        seastar::engine().cpu_id())
+        .then([] (ceph::net::Messenger *msgr) {
       auto& conf = ceph::common::local_conf();
       if (conf->ms_crc_data) {
-        msgr.set_crc_data();
+        msgr->set_crc_data();
       }
       if (conf->ms_crc_header) {
-        msgr.set_crc_header();
+        msgr->set_crc_header();
       }
-      return seastar::do_with(MonClient{msgr},
-                              [&msgr](auto& monc) {
-        return msgr.start(&monc).then([&monc] {
+      return seastar::do_with(MonClient{*msgr},
+                              [msgr](auto& monc) {
+        return msgr->start(&monc).then([&monc] {
           return seastar::with_timeout(
             seastar::lowres_clock::now() + std::chrono::seconds{10},
             monc.start());
         }).then([&monc] {
           return monc.stop();
         });
-      }).finally([&msgr] {
-        return msgr.shutdown();
+      }).finally([msgr] {
+        return msgr->shutdown();
       });
     });
   }).finally([] {