]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: create msgrs in main.cc
authorKefu Chai <kchai@redhat.com>
Fri, 15 Mar 2019 11:15:43 +0000 (19:15 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 20 Mar 2019 07:41:39 +0000 (15:41 +0800)
messengers are sharded<Service>. we should not create them in another
sharded service's start() method. to ensure the ordering of stop of
sharded services, we should create the sharded services in main().
and register their stop() method in the proper order.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/main.cc
src/crimson/osd/osd.cc
src/crimson/osd/osd.h

index db58c1d6e5b3661391ace98ac235f70e6d6e41b2..ef17322d62ba12b4b452d2b1ae655589cd5c8d44 100644 (file)
@@ -31,11 +31,15 @@ namespace {
 Heartbeat::Heartbeat(int whoami,
                      uint32_t nonce,
                      const OSDMapService& service,
-                     ceph::mon::Client& monc)
+                     ceph::mon::Client& monc,
+                     ceph::net::Messenger& front_msgr,
+                     ceph::net::Messenger& back_msgr)
   : whoami{whoami},
     nonce{nonce},
     service{service},
     monc{monc},
+    front_msgr{front_msgr},
+    back_msgr{back_msgr},
     timer{[this] {send_heartbeats();}}
 {}
 
@@ -47,23 +51,8 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
   for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
     addr.set_port(0);
   }
-  return seastar::when_all_succeed(
-      ceph::net::Messenger::create(entity_name_t::OSD(whoami),
-                                   "hb_front",
-                                   nonce,
-                                   seastar::engine().cpu_id())
-        .then([this, front_addrs] (auto msgr) {
-          front_msgr = msgr;
-          return start_messenger(front_msgr, front_addrs);
-        }),
-      ceph::net::Messenger::create(entity_name_t::OSD(whoami),
-                                   "hb_back",
-                                   nonce,
-                                   seastar::engine().cpu_id())
-        .then([this, back_addrs] (auto msgr) {
-          back_msgr = msgr;
-          return start_messenger(back_msgr, back_addrs);
-        }))
+  return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs),
+                                   start_messenger(back_msgr, back_addrs))
     .then([this] {
       timer.arm_periodic(
         std::chrono::seconds(local_conf()->osd_heartbeat_interval));
@@ -71,36 +60,35 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
 }
 
 seastar::future<>
-Heartbeat::start_messenger(ceph::net::Messenger* msgr,
+Heartbeat::start_messenger(ceph::net::Messenger& msgr,
                            const entity_addrvec_t& addrs)
 {
   if (local_conf()->ms_crc_data) {
-    msgr->set_crc_data();
+    msgr.set_crc_data();
   }
   if (local_conf()->ms_crc_header) {
-    msgr->set_crc_header();
+    msgr.set_crc_header();
   }
-  return msgr->try_bind(addrs,
-                        local_conf()->ms_bind_port_min,
-                        local_conf()->ms_bind_port_max).then([msgr, this] {
-    return msgr->start(this);
+  return msgr.try_bind(addrs,
+                       local_conf()->ms_bind_port_min,
+                       local_conf()->ms_bind_port_max).then([&msgr, this] {
+    return msgr.start(this);
   });
 }
 
 seastar::future<> Heartbeat::stop()
 {
-  return seastar::when_all_succeed(front_msgr->shutdown(),
-                                   back_msgr->shutdown());
+  return seastar::now();
 }
 
 const entity_addrvec_t& Heartbeat::get_front_addrs() const
 {
-  return front_msgr->get_myaddrs();
+  return front_msgr.get_myaddrs();
 }
 
 const entity_addrvec_t& Heartbeat::get_back_addrs() const
 {
-  return back_msgr->get_myaddrs();
+  return back_msgr.get_myaddrs();
 }
 
 seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
@@ -111,10 +99,10 @@ seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
     auto osdmap = service.get_map();
     // TODO: msgr v2
     return seastar::when_all_succeed(
-        front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
-                            CEPH_ENTITY_TYPE_OSD),
-        back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
-                           CEPH_ENTITY_TYPE_OSD))
+        front_msgr.connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
+                           CEPH_ENTITY_TYPE_OSD),
+        back_msgr.connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
+                          CEPH_ENTITY_TYPE_OSD))
       .then([this, peer, epoch] (auto xcon_front, auto xcon_back) {
         PeerInfo info;
         // sharded-messenger compatible mode
index 63b893fcc9835876f31bf30cc9f5246283a610ab..2cb3da31c8e4c39611be993f19df55e1009ab1ab 100644 (file)
@@ -25,7 +25,9 @@ public:
   Heartbeat(int whoami,
            uint32_t nonce,
            const OSDMapService& service,
-           ceph::mon::Client& monc);
+           ceph::mon::Client& monc,
+           ceph::net::Messenger& front_msgr,
+           ceph::net::Messenger& back_msgr);
 
   seastar::future<> start(entity_addrvec_t front,
                          entity_addrvec_t back);
@@ -65,15 +67,15 @@ private:
   /// add enough reporters for fast failure detection
   void add_reporter_peers(int whoami);
 
-  seastar::future<> start_messenger(ceph::net::Messenger* msgr,
+  seastar::future<> start_messenger(ceph::net::Messenger& msgr,
                                    const entity_addrvec_t& addrs);
 private:
   const int whoami;
   const uint32_t nonce;
-  ceph::net::Messenger* front_msgr = nullptr;
-  ceph::net::Messenger* back_msgr = nullptr;
   const OSDMapService& service;
   ceph::mon::Client& monc;
+  ceph::net::Messenger& front_msgr;
+  ceph::net::Messenger& back_msgr;
 
   seastar::timer<seastar::lowres_clock> timer;
   // use real_clock so it can be converted to utime_t
index e22de550249b56bdfce1f44e79ccacd2116814a9..6f2d68af588a27dd5b1cce1e3d90c229f8fe0724 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "common/ceph_argparse.h"
 #include "crimson/common/config_proxy.h"
+#include "crimson/net/SocketMessenger.h"
 
 #include "osd.h"
 
@@ -67,47 +68,62 @@ int main(int argc, char* argv[])
     usage(argv[0]);
     return EXIT_SUCCESS;
   }
-  std::string cluster;
+  std::string cluster_name;
   std::string conf_file_list;
   // ceph_argparse_early_args() could _exit(), while local_conf() won't ready
   // until it's started. so do the boilerplate-settings parsing here.
   auto init_params = ceph_argparse_early_args(ceph_args,
                                               CEPH_ENTITY_TYPE_OSD,
-                                              &cluster,
+                                              &cluster_name,
                                               &conf_file_list);
   seastar::sharded<OSD> osd;
+  seastar::sharded<ceph::net::SocketMessenger> cluster_msgr, client_msgr;
+  seastar::sharded<ceph::net::SocketMessenger> hb_front_msgr, hb_back_msgr;
   using ceph::common::sharded_conf;
   using ceph::common::sharded_perf_coll;
   using ceph::common::local_conf;
   try {
     return app.run_deprecated(app_args.size(), const_cast<char**>(app_args.data()), [&] {
       auto& config = app.configuration();
-      seastar::engine().at_exit([] {
-        return sharded_conf().stop();
-      });
-      seastar::engine().at_exit([] {
-        return sharded_perf_coll().stop();
-      });
-      seastar::engine().at_exit([&] {
-       return osd.stop();
-      });
-      return sharded_conf().start(init_params.name, cluster).then([] {
-        return sharded_perf_coll().start();
-      }).then([&conf_file_list] {
-        return local_conf().parse_config_files(conf_file_list);
-      }).then([&] {
-        return local_conf().parse_argv(ceph_args);
-      }).then([&] {
-        return osd.start_single(std::stoi(local_conf()->name.get_id()),
-                                static_cast<uint32_t>(getpid()));
-      }).then([&osd, mkfs = config.count("mkfs")] {
-        if (mkfs) {
-          return osd.invoke_on(0, &OSD::mkfs,
-                               local_conf().get_val<uuid_d>("fsid"))
-            .then([] { seastar::engine().exit(0); });
+      return seastar::async([&] {
+        sharded_conf().start(init_params.name, cluster_name).get();
+        sharded_perf_coll().start().get();
+        local_conf().parse_config_files(conf_file_list).get();
+        local_conf().parse_argv(ceph_args).get();
+        const int whoami = std::stoi(local_conf()->name.get_id());
+        const auto nonce = static_cast<uint32_t>(getpid());
+        const auto shard = seastar::engine().cpu_id();
+        cluster_msgr.start(entity_name_t::OSD(whoami), "cluster"s, nonce, shard).get();
+        client_msgr.start(entity_name_t::OSD(whoami), "client"s, nonce, shard).get();
+        hb_front_msgr.start(entity_name_t::OSD(whoami), "hb_front"s, nonce, shard).get();
+        hb_back_msgr.start(entity_name_t::OSD(whoami), "hb_back"s, nonce, shard).get();
+        osd.start_single(whoami, nonce,
+          reference_wrapper<ceph::net::Messenger>(cluster_msgr.local()),
+          reference_wrapper<ceph::net::Messenger>(client_msgr.local()),
+          reference_wrapper<ceph::net::Messenger>(hb_front_msgr.local()),
+          reference_wrapper<ceph::net::Messenger>(hb_back_msgr.local())).get();
+        if (config.count("mkfs")) {
+          osd.invoke_on(0, &OSD::mkfs,
+                        local_conf().get_val<uuid_d>("fsid"))
+            .then([] { seastar::engine().exit(0); }).get();
         } else {
-          return osd.invoke_on(0, &OSD::start);
+          osd.invoke_on(0, &OSD::start).get();
         }
+        seastar::engine().at_exit([&] {
+          return osd.stop();
+        });
+        seastar::engine().at_exit([&] {
+          return seastar::when_all_succeed(cluster_msgr.stop(),
+                                           client_msgr.stop(),
+                                           hb_front_msgr.stop(),
+                                           hb_back_msgr.stop());
+        });
+        seastar::engine().at_exit([] {
+          return sharded_perf_coll().stop();
+        });
+        seastar::engine().at_exit([] {
+          return sharded_conf().stop();
+        });
       });
     });
   } catch (...) {
index cb163a1119ebd9044bdaca3086bdf11841d40743..20f66b26b53dd77f7182e6fbe16d0901a0970202 100644 (file)
@@ -35,10 +35,19 @@ namespace {
 using ceph::common::local_conf;
 using ceph::os::CyanStore;
 
-OSD::OSD(int id, uint32_t nonce)
+OSD::OSD(int id, uint32_t nonce,
+         ceph::net::Messenger& cluster_msgr,
+         ceph::net::Messenger& public_msgr,
+         ceph::net::Messenger& hb_front_msgr,
+         ceph::net::Messenger& hb_back_msgr)
   : whoami{id},
     nonce{nonce},
     beacon_timer{[this] { send_beacon(); }},
+    cluster_msgr{cluster_msgr},
+    public_msgr{public_msgr},
+    monc{new ceph::mon::Client{public_msgr}},
+    heartbeat{new Heartbeat{whoami, nonce, *this, *monc,
+                            hb_front_msgr, hb_back_msgr}},
     heartbeat_timer{[this] { update_heartbeat_peers(); }},
     store{std::make_unique<ceph::os::CyanStore>(
       local_conf().get_val<std::string>("osd_data"))}
@@ -151,34 +160,7 @@ seastar::future<> OSD::start()
 {
   logger().info("start");
 
-  return seastar::when_all_succeed(
-    ceph::net::Messenger::create(entity_name_t::OSD(whoami),
-                                 "cluster",
-                                 nonce,
-                                 seastar::engine().cpu_id())
-      .then([this] (auto msgr) { cluster_msgr = msgr; }),
-    ceph::net::Messenger::create(entity_name_t::OSD(whoami),
-                                 "client",
-                                 nonce,
-                                 seastar::engine().cpu_id())
-      .then([this] (auto msgr) { public_msgr = msgr; }))
-  .then([this] {
-    monc.reset(new ceph::mon::Client{*public_msgr});
-    heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});
-
-    for (auto msgr : {cluster_msgr, public_msgr}) {
-      if (local_conf()->ms_crc_data) {
-        msgr->set_crc_data();
-      }
-      if (local_conf()->ms_crc_header) {
-        msgr->set_crc_header();
-      }
-    }
-    dispatchers.push_front(this);
-    dispatchers.push_front(monc.get());
-
-    return store->mount();
-  }).then([this] {
+  return store->mount().then([this] {
     meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
                                      store.get());
     return meta_coll->load_superblock();
@@ -189,15 +171,25 @@ seastar::future<> OSD::start()
     osdmap = std::move(map);
     return load_pgs();
   }).then([this] {
+    for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr)}) {
+      if (local_conf()->ms_crc_data) {
+        msgr.get().set_crc_data();
+      }
+      if (local_conf()->ms_crc_header) {
+        msgr.get().set_crc_header();
+      }
+    }
+    dispatchers.push_front(this);
+    dispatchers.push_front(monc.get());
     return seastar::when_all_succeed(
-      cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
-                             local_conf()->ms_bind_port_min,
-                             local_conf()->ms_bind_port_max)
-        .then([this] { return cluster_msgr->start(&dispatchers); }),
-      public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+      cluster_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
                             local_conf()->ms_bind_port_min,
                             local_conf()->ms_bind_port_max)
-        .then([this] { return public_msgr->start(&dispatchers); }));
+        .then([this] { return cluster_msgr.start(&dispatchers); }),
+      public_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+                           local_conf()->ms_bind_port_min,
+                           local_conf()->ms_bind_port_max)
+        .then([this] { return public_msgr.start(&dispatchers); }));
   }).then([this] {
     return monc->start();
   }).then([this] {
@@ -207,12 +199,12 @@ seastar::future<> OSD::start()
     return monc->renew_subs();
   }).then([this] {
     if (auto [addrs, changed] =
-        replace_unknown_addrs(cluster_msgr->get_myaddrs(),
-                              public_msgr->get_myaddrs()); changed) {
-      cluster_msgr->set_myaddrs(addrs);
+        replace_unknown_addrs(cluster_msgr.get_myaddrs(),
+                              public_msgr.get_myaddrs()); changed) {
+      cluster_msgr.set_myaddrs(addrs);
     }
-    return heartbeat->start(public_msgr->get_myaddrs(),
-                            cluster_msgr->get_myaddrs());
+    return heartbeat->start(public_msgr.get_myaddrs(),
+                            cluster_msgr.get_myaddrs());
   }).then([this] {
     return start_boot();
   });
@@ -265,13 +257,13 @@ seastar::future<> OSD::_send_boot()
 
   logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
   logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
-  logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
+  logger().info("cluster_msgr: {}", cluster_msgr.get_myaddr());
   auto m = make_message<MOSDBoot>(superblock,
                                   osdmap->get_epoch(),
                                   osdmap->get_epoch(),
                                   heartbeat->get_back_addrs(),
                                   heartbeat->get_front_addrs(),
-                                  cluster_msgr->get_myaddrs(),
+                                  cluster_msgr.get_myaddrs(),
                                   CEPH_FEATURES_ALL);
   return monc->send_message(m);
 }
@@ -285,10 +277,6 @@ seastar::future<> OSD::stop()
     return heartbeat->stop();
   }).then([this] {
     return monc->stop();
-  }).then([this] {
-    return public_msgr->shutdown();
-  }).then([this] {
-    return cluster_msgr->shutdown();
   }).then([this] {
     return store->umount();
   });
@@ -552,7 +540,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
       osdmap = std::move(o);
       if (up_epoch != 0 &&
           osdmap->is_up(whoami) &&
-          osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+          osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) {
         up_epoch = osdmap->get_epoch();
         if (!boot_epoch) {
           boot_epoch = osdmap->get_epoch();
@@ -561,7 +549,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
     });
   }).then([m, this] {
     if (osdmap->is_up(whoami) &&
-        osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+        osdmap->get_addrs(whoami) == public_msgr.get_myaddrs() &&
         bind_epoch < osdmap->get_up_from(whoami)) {
       if (state.is_booting()) {
         logger().info("osd.{}: activating...", whoami);
@@ -606,17 +594,17 @@ bool OSD::should_restart() const
     logger().info("map e {} marked osd.{} down",
                   osdmap->get_epoch(), whoami);
     return true;
-  } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
+  } else if (osdmap->get_addrs(whoami) != public_msgr.get_myaddrs()) {
     logger().error("map e {} had wrong client addr ({} != my {})",
                    osdmap->get_epoch(),
                    osdmap->get_addrs(whoami),
-                   public_msgr->get_myaddrs());
+                   public_msgr.get_myaddrs());
     return true;
-  } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
+  } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr.get_myaddrs()) {
     logger().error("map e {} had wrong cluster addr ({} != my {})",
                    osdmap->get_epoch(),
                    osdmap->get_cluster_addrs(whoami),
-                   cluster_msgr->get_myaddrs());
+                   cluster_msgr.get_myaddrs());
     return true;
   } else {
     return false;
index 55336289fbb6216158e961a1bfd6ed1679437961..03fdc583bc51ad980737b45fe9988287880dce87 100644 (file)
@@ -44,9 +44,9 @@ class OSD : public ceph::net::Dispatcher,
   const uint32_t nonce;
   seastar::timer<seastar::lowres_clock> beacon_timer;
   // talk with osd
-  ceph::net::Messenger* cluster_msgr = nullptr;
+  ceph::net::Messenger& cluster_msgr;
   // talk with client/mon/mgr
-  ceph::net::Messenger* public_msgr = nullptr;
+  ceph::net::Messenger& public_msgr;
   ChainedDispatchers dispatchers;
   std::unique_ptr<ceph::mon::Client> monc;
 
@@ -81,7 +81,11 @@ class OSD : public ceph::net::Dispatcher,
   seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override;
 
 public:
-  OSD(int id, uint32_t nonce);
+  OSD(int id, uint32_t nonce,
+      ceph::net::Messenger& cluster_msgr,
+      ceph::net::Messenger& client_msgr,
+      ceph::net::Messenger& hb_front_msgr,
+      ceph::net::Messenger& hb_back_msgr);
   ~OSD() override;
 
   seastar::future<> mkfs(uuid_d fsid);