From: Kefu Chai Date: Fri, 15 Mar 2019 11:15:43 +0000 (+0800) Subject: crimson/osd: create msgrs in main.cc X-Git-Tag: v15.0.0~186^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bcc64d2f23978e410f51115e010f8de56db66777;p=ceph.git crimson/osd: create msgrs in main.cc messengers are sharded. we should not create them in another sharded service's start() method. to ensure the ordering of stop of sharded services, we should create the sharded services in main(). and register their stop() method in the proper order. Signed-off-by: Kefu Chai --- diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index db58c1d6e5b3..ef17322d62ba 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -31,11 +31,15 @@ namespace { Heartbeat::Heartbeat(int whoami, uint32_t nonce, const OSDMapService& service, - ceph::mon::Client& monc) + ceph::mon::Client& monc, + ceph::net::Messenger& front_msgr, + ceph::net::Messenger& back_msgr) : whoami{whoami}, nonce{nonce}, service{service}, monc{monc}, + front_msgr{front_msgr}, + back_msgr{back_msgr}, timer{[this] {send_heartbeats();}} {} @@ -47,23 +51,8 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) { addr.set_port(0); } - return seastar::when_all_succeed( - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "hb_front", - nonce, - seastar::engine().cpu_id()) - .then([this, front_addrs] (auto msgr) { - front_msgr = msgr; - return start_messenger(front_msgr, front_addrs); - }), - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "hb_back", - nonce, - seastar::engine().cpu_id()) - .then([this, back_addrs] (auto msgr) { - back_msgr = msgr; - return start_messenger(back_msgr, back_addrs); - })) + return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs), + start_messenger(back_msgr, back_addrs)) .then([this] { timer.arm_periodic( std::chrono::seconds(local_conf()->osd_heartbeat_interval)); @@ -71,36 +60,35 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, } seastar::future<> -Heartbeat::start_messenger(ceph::net::Messenger* msgr, +Heartbeat::start_messenger(ceph::net::Messenger& msgr, const entity_addrvec_t& addrs) { if (local_conf()->ms_crc_data) { - msgr->set_crc_data(); + msgr.set_crc_data(); } if (local_conf()->ms_crc_header) { - msgr->set_crc_header(); + msgr.set_crc_header(); } - return msgr->try_bind(addrs, - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max).then([msgr, this] { - return msgr->start(this); + return msgr.try_bind(addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max).then([&msgr, this] { + return msgr.start(this); }); } seastar::future<> Heartbeat::stop() { - return seastar::when_all_succeed(front_msgr->shutdown(), - back_msgr->shutdown()); + return seastar::now(); } const entity_addrvec_t& Heartbeat::get_front_addrs() const { - return front_msgr->get_myaddrs(); + return front_msgr.get_myaddrs(); } const entity_addrvec_t& Heartbeat::get_back_addrs() const { - return back_msgr->get_myaddrs(); + return back_msgr.get_myaddrs(); } seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) @@ -111,10 +99,10 @@ seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) auto osdmap = service.get_map(); // TODO: msgr v2 return seastar::when_all_succeed( - front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), - CEPH_ENTITY_TYPE_OSD), - back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), - CEPH_ENTITY_TYPE_OSD)) + front_msgr.connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD), + back_msgr.connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD)) .then([this, peer, epoch] (auto xcon_front, auto xcon_back) { PeerInfo info; // sharded-messenger compatible mode diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 63b893fcc983..2cb3da31c8e4 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -25,7 +25,9 @@ public: Heartbeat(int whoami, uint32_t nonce, const OSDMapService& service, - ceph::mon::Client& monc); + ceph::mon::Client& monc, + ceph::net::Messenger& front_msgr, + ceph::net::Messenger& back_msgr); seastar::future<> start(entity_addrvec_t front, entity_addrvec_t back); @@ -65,15 +67,15 @@ private: /// add enough reporters for fast failure detection void add_reporter_peers(int whoami); - seastar::future<> start_messenger(ceph::net::Messenger* msgr, + seastar::future<> start_messenger(ceph::net::Messenger& msgr, const entity_addrvec_t& addrs); private: const int whoami; const uint32_t nonce; - ceph::net::Messenger* front_msgr = nullptr; - ceph::net::Messenger* back_msgr = nullptr; const OSDMapService& service; ceph::mon::Client& monc; + ceph::net::Messenger& front_msgr; + ceph::net::Messenger& back_msgr; seastar::timer timer; // use real_clock so it can be converted to utime_t diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc index e22de550249b..6f2d68af588a 100644 --- a/src/crimson/osd/main.cc +++ b/src/crimson/osd/main.cc @@ -11,6 +11,7 @@ #include "common/ceph_argparse.h" #include "crimson/common/config_proxy.h" +#include "crimson/net/SocketMessenger.h" #include "osd.h" @@ -67,47 +68,62 @@ int main(int argc, char* argv[]) usage(argv[0]); return EXIT_SUCCESS; } - std::string cluster; + std::string cluster_name; std::string conf_file_list; // ceph_argparse_early_args() could _exit(), while local_conf() won't ready // until it's started. so do the boilerplate-settings parsing here. auto init_params = ceph_argparse_early_args(ceph_args, CEPH_ENTITY_TYPE_OSD, - &cluster, + &cluster_name, &conf_file_list); seastar::sharded osd; + seastar::sharded cluster_msgr, client_msgr; + seastar::sharded hb_front_msgr, hb_back_msgr; using ceph::common::sharded_conf; using ceph::common::sharded_perf_coll; using ceph::common::local_conf; try { return app.run_deprecated(app_args.size(), const_cast(app_args.data()), [&] { auto& config = app.configuration(); - seastar::engine().at_exit([] { - return sharded_conf().stop(); - }); - seastar::engine().at_exit([] { - return sharded_perf_coll().stop(); - }); - seastar::engine().at_exit([&] { - return osd.stop(); - }); - return sharded_conf().start(init_params.name, cluster).then([] { - return sharded_perf_coll().start(); - }).then([&conf_file_list] { - return local_conf().parse_config_files(conf_file_list); - }).then([&] { - return local_conf().parse_argv(ceph_args); - }).then([&] { - return osd.start_single(std::stoi(local_conf()->name.get_id()), - static_cast(getpid())); - }).then([&osd, mkfs = config.count("mkfs")] { - if (mkfs) { - return osd.invoke_on(0, &OSD::mkfs, - local_conf().get_val("fsid")) - .then([] { seastar::engine().exit(0); }); + return seastar::async([&] { + sharded_conf().start(init_params.name, cluster_name).get(); + sharded_perf_coll().start().get(); + local_conf().parse_config_files(conf_file_list).get(); + local_conf().parse_argv(ceph_args).get(); + const int whoami = std::stoi(local_conf()->name.get_id()); + const auto nonce = static_cast(getpid()); + const auto shard = seastar::engine().cpu_id(); + cluster_msgr.start(entity_name_t::OSD(whoami), "cluster"s, nonce, shard).get(); + client_msgr.start(entity_name_t::OSD(whoami), "client"s, nonce, shard).get(); + hb_front_msgr.start(entity_name_t::OSD(whoami), "hb_front"s, nonce, shard).get(); + hb_back_msgr.start(entity_name_t::OSD(whoami), "hb_back"s, nonce, shard).get(); + osd.start_single(whoami, nonce, + reference_wrapper(cluster_msgr.local()), + reference_wrapper(client_msgr.local()), + reference_wrapper(hb_front_msgr.local()), + reference_wrapper(hb_back_msgr.local())).get(); + if (config.count("mkfs")) { + osd.invoke_on(0, &OSD::mkfs, + local_conf().get_val("fsid")) + .then([] { seastar::engine().exit(0); }).get(); } else { - return osd.invoke_on(0, &OSD::start); + osd.invoke_on(0, &OSD::start).get(); } + seastar::engine().at_exit([&] { + return osd.stop(); + }); + seastar::engine().at_exit([&] { + return seastar::when_all_succeed(cluster_msgr.stop(), + client_msgr.stop(), + hb_front_msgr.stop(), + hb_back_msgr.stop()); + }); + seastar::engine().at_exit([] { + return sharded_perf_coll().stop(); + }); + seastar::engine().at_exit([] { + return sharded_conf().stop(); + }); }); }); } catch (...) { diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index cb163a1119eb..20f66b26b53d 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -35,10 +35,19 @@ namespace { using ceph::common::local_conf; using ceph::os::CyanStore; -OSD::OSD(int id, uint32_t nonce) +OSD::OSD(int id, uint32_t nonce, + ceph::net::Messenger& cluster_msgr, + ceph::net::Messenger& public_msgr, + ceph::net::Messenger& hb_front_msgr, + ceph::net::Messenger& hb_back_msgr) : whoami{id}, nonce{nonce}, beacon_timer{[this] { send_beacon(); }}, + cluster_msgr{cluster_msgr}, + public_msgr{public_msgr}, + monc{new ceph::mon::Client{public_msgr}}, + heartbeat{new Heartbeat{whoami, nonce, *this, *monc, + hb_front_msgr, hb_back_msgr}}, heartbeat_timer{[this] { update_heartbeat_peers(); }}, store{std::make_unique( local_conf().get_val("osd_data"))} @@ -151,34 +160,7 @@ seastar::future<> OSD::start() { logger().info("start"); - return seastar::when_all_succeed( - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "cluster", - nonce, - seastar::engine().cpu_id()) - .then([this] (auto msgr) { cluster_msgr = msgr; }), - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "client", - nonce, - seastar::engine().cpu_id()) - .then([this] (auto msgr) { public_msgr = msgr; })) - .then([this] { - monc.reset(new ceph::mon::Client{*public_msgr}); - heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc}); - - for (auto msgr : {cluster_msgr, public_msgr}) { - if (local_conf()->ms_crc_data) { - msgr->set_crc_data(); - } - if (local_conf()->ms_crc_header) { - msgr->set_crc_header(); - } - } - dispatchers.push_front(this); - dispatchers.push_front(monc.get()); - - return store->mount(); - }).then([this] { + return store->mount().then([this] { meta_coll = make_unique(store->open_collection(coll_t::meta()), store.get()); return meta_coll->load_superblock(); @@ -189,15 +171,25 @@ seastar::future<> OSD::start() osdmap = std::move(map); return load_pgs(); }).then([this] { + for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr)}) { + if (local_conf()->ms_crc_data) { + msgr.get().set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr.get().set_crc_header(); + } + } + dispatchers.push_front(this); + dispatchers.push_front(monc.get()); return seastar::when_all_succeed( - cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max) - .then([this] { return cluster_msgr->start(&dispatchers); }), - public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + cluster_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this] { return public_msgr->start(&dispatchers); })); + .then([this] { return cluster_msgr.start(&dispatchers); }), + public_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return public_msgr.start(&dispatchers); })); }).then([this] { return monc->start(); }).then([this] { @@ -207,12 +199,12 @@ seastar::future<> OSD::start() return monc->renew_subs(); }).then([this] { if (auto [addrs, changed] = - replace_unknown_addrs(cluster_msgr->get_myaddrs(), - public_msgr->get_myaddrs()); changed) { - cluster_msgr->set_myaddrs(addrs); + replace_unknown_addrs(cluster_msgr.get_myaddrs(), + public_msgr.get_myaddrs()); changed) { + cluster_msgr.set_myaddrs(addrs); } - return heartbeat->start(public_msgr->get_myaddrs(), - cluster_msgr->get_myaddrs()); + return heartbeat->start(public_msgr.get_myaddrs(), + cluster_msgr.get_myaddrs()); }).then([this] { return start_boot(); }); @@ -265,13 +257,13 @@ seastar::future<> OSD::_send_boot() logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); - logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); + logger().info("cluster_msgr: {}", cluster_msgr.get_myaddr()); auto m = make_message(superblock, osdmap->get_epoch(), osdmap->get_epoch(), heartbeat->get_back_addrs(), heartbeat->get_front_addrs(), - cluster_msgr->get_myaddrs(), + cluster_msgr.get_myaddrs(), CEPH_FEATURES_ALL); return monc->send_message(m); } @@ -285,10 +277,6 @@ seastar::future<> OSD::stop() return heartbeat->stop(); }).then([this] { return monc->stop(); - }).then([this] { - return public_msgr->shutdown(); - }).then([this] { - return cluster_msgr->shutdown(); }).then([this] { return store->umount(); }); @@ -552,7 +540,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, osdmap = std::move(o); if (up_epoch != 0 && osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) { up_epoch = osdmap->get_epoch(); if (!boot_epoch) { boot_epoch = osdmap->get_epoch(); @@ -561,7 +549,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, }); }).then([m, this] { if (osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + osdmap->get_addrs(whoami) == public_msgr.get_myaddrs() && bind_epoch < osdmap->get_up_from(whoami)) { if (state.is_booting()) { logger().info("osd.{}: activating...", whoami); @@ -606,17 +594,17 @@ bool OSD::should_restart() const logger().info("map e {} marked osd.{} down", osdmap->get_epoch(), whoami); return true; - } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { + } else if (osdmap->get_addrs(whoami) != public_msgr.get_myaddrs()) { logger().error("map e {} had wrong client addr ({} != my {})", osdmap->get_epoch(), osdmap->get_addrs(whoami), - public_msgr->get_myaddrs()); + public_msgr.get_myaddrs()); return true; - } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr.get_myaddrs()) { logger().error("map e {} had wrong cluster addr ({} != my {})", osdmap->get_epoch(), osdmap->get_cluster_addrs(whoami), - cluster_msgr->get_myaddrs()); + cluster_msgr.get_myaddrs()); return true; } else { return false; diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 55336289fbb6..03fdc583bc51 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -44,9 +44,9 @@ class OSD : public ceph::net::Dispatcher, const uint32_t nonce; seastar::timer beacon_timer; // talk with osd - ceph::net::Messenger* cluster_msgr = nullptr; + ceph::net::Messenger& cluster_msgr; // talk with client/mon/mgr - ceph::net::Messenger* public_msgr = nullptr; + ceph::net::Messenger& public_msgr; ChainedDispatchers dispatchers; std::unique_ptr monc; @@ -81,7 +81,11 @@ class OSD : public ceph::net::Dispatcher, seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; public: - OSD(int id, uint32_t nonce); + OSD(int id, uint32_t nonce, + ceph::net::Messenger& cluster_msgr, + ceph::net::Messenger& client_msgr, + ceph::net::Messenger& hb_front_msgr, + ceph::net::Messenger& hb_back_msgr); ~OSD() override; seastar::future<> mkfs(uuid_d fsid);