From bcc64d2f23978e410f51115e010f8de56db66777 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 15 Mar 2019 19:15:43 +0800 Subject: [PATCH] crimson/osd: create msgrs in main.cc messengers are sharded. we should not create them in another sharded service's start() method. to ensure the ordering of stop of sharded services, we should create the sharded services in main(). and register their stop() method in the proper order. Signed-off-by: Kefu Chai --- src/crimson/osd/heartbeat.cc | 54 ++++++++------------- src/crimson/osd/heartbeat.h | 10 ++-- src/crimson/osd/main.cc | 68 ++++++++++++++++---------- src/crimson/osd/osd.cc | 92 ++++++++++++++++-------------------- src/crimson/osd/osd.h | 10 ++-- 5 files changed, 116 insertions(+), 118 deletions(-) diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index db58c1d6e5b..ef17322d62b 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -31,11 +31,15 @@ namespace { Heartbeat::Heartbeat(int whoami, uint32_t nonce, const OSDMapService& service, - ceph::mon::Client& monc) + ceph::mon::Client& monc, + ceph::net::Messenger& front_msgr, + ceph::net::Messenger& back_msgr) : whoami{whoami}, nonce{nonce}, service{service}, monc{monc}, + front_msgr{front_msgr}, + back_msgr{back_msgr}, timer{[this] {send_heartbeats();}} {} @@ -47,23 +51,8 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) { addr.set_port(0); } - return seastar::when_all_succeed( - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "hb_front", - nonce, - seastar::engine().cpu_id()) - .then([this, front_addrs] (auto msgr) { - front_msgr = msgr; - return start_messenger(front_msgr, front_addrs); - }), - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "hb_back", - nonce, - seastar::engine().cpu_id()) - .then([this, back_addrs] (auto msgr) { - back_msgr = msgr; - return start_messenger(back_msgr, back_addrs); - })) + return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs), + start_messenger(back_msgr, back_addrs)) .then([this] { timer.arm_periodic( std::chrono::seconds(local_conf()->osd_heartbeat_interval)); @@ -71,36 +60,35 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, } seastar::future<> -Heartbeat::start_messenger(ceph::net::Messenger* msgr, +Heartbeat::start_messenger(ceph::net::Messenger& msgr, const entity_addrvec_t& addrs) { if (local_conf()->ms_crc_data) { - msgr->set_crc_data(); + msgr.set_crc_data(); } if (local_conf()->ms_crc_header) { - msgr->set_crc_header(); + msgr.set_crc_header(); } - return msgr->try_bind(addrs, - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max).then([msgr, this] { - return msgr->start(this); + return msgr.try_bind(addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max).then([&msgr, this] { + return msgr.start(this); }); } seastar::future<> Heartbeat::stop() { - return seastar::when_all_succeed(front_msgr->shutdown(), - back_msgr->shutdown()); + return seastar::now(); } const entity_addrvec_t& Heartbeat::get_front_addrs() const { - return front_msgr->get_myaddrs(); + return front_msgr.get_myaddrs(); } const entity_addrvec_t& Heartbeat::get_back_addrs() const { - return back_msgr->get_myaddrs(); + return back_msgr.get_myaddrs(); } seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) @@ -111,10 +99,10 @@ seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) auto osdmap = service.get_map(); // TODO: msgr v2 return seastar::when_all_succeed( - front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), - CEPH_ENTITY_TYPE_OSD), - back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), - CEPH_ENTITY_TYPE_OSD)) + front_msgr.connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD), + back_msgr.connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD)) .then([this, peer, epoch] (auto xcon_front, auto xcon_back) { PeerInfo info; // sharded-messenger compatible mode diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 63b893fcc98..2cb3da31c8e 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -25,7 +25,9 @@ public: Heartbeat(int whoami, uint32_t nonce, const OSDMapService& service, - ceph::mon::Client& monc); + ceph::mon::Client& monc, + ceph::net::Messenger& front_msgr, + ceph::net::Messenger& back_msgr); seastar::future<> start(entity_addrvec_t front, entity_addrvec_t back); @@ -65,15 +67,15 @@ private: /// add enough reporters for fast failure detection void add_reporter_peers(int whoami); - seastar::future<> start_messenger(ceph::net::Messenger* msgr, + seastar::future<> start_messenger(ceph::net::Messenger& msgr, const entity_addrvec_t& addrs); private: const int whoami; const uint32_t nonce; - ceph::net::Messenger* front_msgr = nullptr; - ceph::net::Messenger* back_msgr = nullptr; const OSDMapService& service; ceph::mon::Client& monc; + ceph::net::Messenger& front_msgr; + ceph::net::Messenger& back_msgr; seastar::timer timer; // use real_clock so it can be converted to utime_t diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc index e22de550249..6f2d68af588 100644 --- a/src/crimson/osd/main.cc +++ b/src/crimson/osd/main.cc @@ -11,6 +11,7 @@ #include "common/ceph_argparse.h" #include "crimson/common/config_proxy.h" +#include "crimson/net/SocketMessenger.h" #include "osd.h" @@ -67,47 +68,62 @@ int main(int argc, char* argv[]) usage(argv[0]); return EXIT_SUCCESS; } - std::string cluster; + std::string cluster_name; std::string conf_file_list; // ceph_argparse_early_args() could _exit(), while local_conf() won't ready // until it's started. so do the boilerplate-settings parsing here. auto init_params = ceph_argparse_early_args(ceph_args, CEPH_ENTITY_TYPE_OSD, - &cluster, + &cluster_name, &conf_file_list); seastar::sharded osd; + seastar::sharded cluster_msgr, client_msgr; + seastar::sharded hb_front_msgr, hb_back_msgr; using ceph::common::sharded_conf; using ceph::common::sharded_perf_coll; using ceph::common::local_conf; try { return app.run_deprecated(app_args.size(), const_cast(app_args.data()), [&] { auto& config = app.configuration(); - seastar::engine().at_exit([] { - return sharded_conf().stop(); - }); - seastar::engine().at_exit([] { - return sharded_perf_coll().stop(); - }); - seastar::engine().at_exit([&] { - return osd.stop(); - }); - return sharded_conf().start(init_params.name, cluster).then([] { - return sharded_perf_coll().start(); - }).then([&conf_file_list] { - return local_conf().parse_config_files(conf_file_list); - }).then([&] { - return local_conf().parse_argv(ceph_args); - }).then([&] { - return osd.start_single(std::stoi(local_conf()->name.get_id()), - static_cast(getpid())); - }).then([&osd, mkfs = config.count("mkfs")] { - if (mkfs) { - return osd.invoke_on(0, &OSD::mkfs, - local_conf().get_val("fsid")) - .then([] { seastar::engine().exit(0); }); + return seastar::async([&] { + sharded_conf().start(init_params.name, cluster_name).get(); + sharded_perf_coll().start().get(); + local_conf().parse_config_files(conf_file_list).get(); + local_conf().parse_argv(ceph_args).get(); + const int whoami = std::stoi(local_conf()->name.get_id()); + const auto nonce = static_cast(getpid()); + const auto shard = seastar::engine().cpu_id(); + cluster_msgr.start(entity_name_t::OSD(whoami), "cluster"s, nonce, shard).get(); + client_msgr.start(entity_name_t::OSD(whoami), "client"s, nonce, shard).get(); + hb_front_msgr.start(entity_name_t::OSD(whoami), "hb_front"s, nonce, shard).get(); + hb_back_msgr.start(entity_name_t::OSD(whoami), "hb_back"s, nonce, shard).get(); + osd.start_single(whoami, nonce, + reference_wrapper(cluster_msgr.local()), + reference_wrapper(client_msgr.local()), + reference_wrapper(hb_front_msgr.local()), + reference_wrapper(hb_back_msgr.local())).get(); + if (config.count("mkfs")) { + osd.invoke_on(0, &OSD::mkfs, + local_conf().get_val("fsid")) + .then([] { seastar::engine().exit(0); }).get(); } else { - return osd.invoke_on(0, &OSD::start); + osd.invoke_on(0, &OSD::start).get(); } + seastar::engine().at_exit([&] { + return osd.stop(); + }); + seastar::engine().at_exit([&] { + return seastar::when_all_succeed(cluster_msgr.stop(), + client_msgr.stop(), + hb_front_msgr.stop(), + hb_back_msgr.stop()); + }); + seastar::engine().at_exit([] { + return sharded_perf_coll().stop(); + }); + seastar::engine().at_exit([] { + return sharded_conf().stop(); + }); }); }); } catch (...) { diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index cb163a1119e..20f66b26b53 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -35,10 +35,19 @@ namespace { using ceph::common::local_conf; using ceph::os::CyanStore; -OSD::OSD(int id, uint32_t nonce) +OSD::OSD(int id, uint32_t nonce, + ceph::net::Messenger& cluster_msgr, + ceph::net::Messenger& public_msgr, + ceph::net::Messenger& hb_front_msgr, + ceph::net::Messenger& hb_back_msgr) : whoami{id}, nonce{nonce}, beacon_timer{[this] { send_beacon(); }}, + cluster_msgr{cluster_msgr}, + public_msgr{public_msgr}, + monc{new ceph::mon::Client{public_msgr}}, + heartbeat{new Heartbeat{whoami, nonce, *this, *monc, + hb_front_msgr, hb_back_msgr}}, heartbeat_timer{[this] { update_heartbeat_peers(); }}, store{std::make_unique( local_conf().get_val("osd_data"))} @@ -151,34 +160,7 @@ seastar::future<> OSD::start() { logger().info("start"); - return seastar::when_all_succeed( - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "cluster", - nonce, - seastar::engine().cpu_id()) - .then([this] (auto msgr) { cluster_msgr = msgr; }), - ceph::net::Messenger::create(entity_name_t::OSD(whoami), - "client", - nonce, - seastar::engine().cpu_id()) - .then([this] (auto msgr) { public_msgr = msgr; })) - .then([this] { - monc.reset(new ceph::mon::Client{*public_msgr}); - heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc}); - - for (auto msgr : {cluster_msgr, public_msgr}) { - if (local_conf()->ms_crc_data) { - msgr->set_crc_data(); - } - if (local_conf()->ms_crc_header) { - msgr->set_crc_header(); - } - } - dispatchers.push_front(this); - dispatchers.push_front(monc.get()); - - return store->mount(); - }).then([this] { + return store->mount().then([this] { meta_coll = make_unique(store->open_collection(coll_t::meta()), store.get()); return meta_coll->load_superblock(); @@ -189,15 +171,25 @@ seastar::future<> OSD::start() osdmap = std::move(map); return load_pgs(); }).then([this] { + for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr)}) { + if (local_conf()->ms_crc_data) { + msgr.get().set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr.get().set_crc_header(); + } + } + dispatchers.push_front(this); + dispatchers.push_front(monc.get()); return seastar::when_all_succeed( - cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max) - .then([this] { return cluster_msgr->start(&dispatchers); }), - public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + cluster_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this] { return public_msgr->start(&dispatchers); })); + .then([this] { return cluster_msgr.start(&dispatchers); }), + public_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return public_msgr.start(&dispatchers); })); }).then([this] { return monc->start(); }).then([this] { @@ -207,12 +199,12 @@ seastar::future<> OSD::start() return monc->renew_subs(); }).then([this] { if (auto [addrs, changed] = - replace_unknown_addrs(cluster_msgr->get_myaddrs(), - public_msgr->get_myaddrs()); changed) { - cluster_msgr->set_myaddrs(addrs); + replace_unknown_addrs(cluster_msgr.get_myaddrs(), + public_msgr.get_myaddrs()); changed) { + cluster_msgr.set_myaddrs(addrs); } - return heartbeat->start(public_msgr->get_myaddrs(), - cluster_msgr->get_myaddrs()); + return heartbeat->start(public_msgr.get_myaddrs(), + cluster_msgr.get_myaddrs()); }).then([this] { return start_boot(); }); @@ -265,13 +257,13 @@ seastar::future<> OSD::_send_boot() logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); - logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); + logger().info("cluster_msgr: {}", cluster_msgr.get_myaddr()); auto m = make_message(superblock, osdmap->get_epoch(), osdmap->get_epoch(), heartbeat->get_back_addrs(), heartbeat->get_front_addrs(), - cluster_msgr->get_myaddrs(), + cluster_msgr.get_myaddrs(), CEPH_FEATURES_ALL); return monc->send_message(m); } @@ -285,10 +277,6 @@ seastar::future<> OSD::stop() return heartbeat->stop(); }).then([this] { return monc->stop(); - }).then([this] { - return public_msgr->shutdown(); - }).then([this] { - return cluster_msgr->shutdown(); }).then([this] { return store->umount(); }); @@ -552,7 +540,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, osdmap = std::move(o); if (up_epoch != 0 && osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) { up_epoch = osdmap->get_epoch(); if (!boot_epoch) { boot_epoch = osdmap->get_epoch(); @@ -561,7 +549,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, }); }).then([m, this] { if (osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + osdmap->get_addrs(whoami) == public_msgr.get_myaddrs() && bind_epoch < osdmap->get_up_from(whoami)) { if (state.is_booting()) { logger().info("osd.{}: activating...", whoami); @@ -606,17 +594,17 @@ bool OSD::should_restart() const logger().info("map e {} marked osd.{} down", osdmap->get_epoch(), whoami); return true; - } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { + } else if (osdmap->get_addrs(whoami) != public_msgr.get_myaddrs()) { logger().error("map e {} had wrong client addr ({} != my {})", osdmap->get_epoch(), osdmap->get_addrs(whoami), - public_msgr->get_myaddrs()); + public_msgr.get_myaddrs()); return true; - } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr.get_myaddrs()) { logger().error("map e {} had wrong cluster addr ({} != my {})", osdmap->get_epoch(), osdmap->get_cluster_addrs(whoami), - cluster_msgr->get_myaddrs()); + cluster_msgr.get_myaddrs()); return true; } else { return false; diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 55336289fbb..03fdc583bc5 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -44,9 +44,9 @@ class OSD : public ceph::net::Dispatcher, const uint32_t nonce; seastar::timer beacon_timer; // talk with osd - ceph::net::Messenger* cluster_msgr = nullptr; + ceph::net::Messenger& cluster_msgr; // talk with client/mon/mgr - ceph::net::Messenger* public_msgr = nullptr; + ceph::net::Messenger& public_msgr; ChainedDispatchers dispatchers; std::unique_ptr monc; @@ -81,7 +81,11 @@ class OSD : public ceph::net::Dispatcher, seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; public: - OSD(int id, uint32_t nonce); + OSD(int id, uint32_t nonce, + ceph::net::Messenger& cluster_msgr, + ceph::net::Messenger& client_msgr, + ceph::net::Messenger& hb_front_msgr, + ceph::net::Messenger& hb_back_msgr); ~OSD() override; seastar::future<> mkfs(uuid_d fsid); -- 2.39.5