Heartbeat::Heartbeat(int whoami,
uint32_t nonce,
const OSDMapService& service,
- ceph::mon::Client& monc)
+ ceph::mon::Client& monc,
+ ceph::net::Messenger& front_msgr,
+ ceph::net::Messenger& back_msgr)
: whoami{whoami},
nonce{nonce},
service{service},
monc{monc},
+ front_msgr{front_msgr},
+ back_msgr{back_msgr},
timer{[this] {send_heartbeats();}}
{}
for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
addr.set_port(0);
}
- return seastar::when_all_succeed(
- ceph::net::Messenger::create(entity_name_t::OSD(whoami),
- "hb_front",
- nonce,
- seastar::engine().cpu_id())
- .then([this, front_addrs] (auto msgr) {
- front_msgr = msgr;
- return start_messenger(front_msgr, front_addrs);
- }),
- ceph::net::Messenger::create(entity_name_t::OSD(whoami),
- "hb_back",
- nonce,
- seastar::engine().cpu_id())
- .then([this, back_addrs] (auto msgr) {
- back_msgr = msgr;
- return start_messenger(back_msgr, back_addrs);
- }))
+ return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs),
+ start_messenger(back_msgr, back_addrs))
.then([this] {
timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_heartbeat_interval));
}
seastar::future<>
-Heartbeat::start_messenger(ceph::net::Messenger* msgr,
+Heartbeat::start_messenger(ceph::net::Messenger& msgr,
const entity_addrvec_t& addrs)
{
if (local_conf()->ms_crc_data) {
- msgr->set_crc_data();
+ msgr.set_crc_data();
}
if (local_conf()->ms_crc_header) {
- msgr->set_crc_header();
+ msgr.set_crc_header();
}
- return msgr->try_bind(addrs,
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max).then([msgr, this] {
- return msgr->start(this);
+ return msgr.try_bind(addrs,
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max).then([&msgr, this] {
+ return msgr.start(this);
});
}
seastar::future<> Heartbeat::stop()
{
- return seastar::when_all_succeed(front_msgr->shutdown(),
- back_msgr->shutdown());
+ return seastar::now();
}
const entity_addrvec_t& Heartbeat::get_front_addrs() const
{
- return front_msgr->get_myaddrs();
+ return front_msgr.get_myaddrs();
}
const entity_addrvec_t& Heartbeat::get_back_addrs() const
{
- return back_msgr->get_myaddrs();
+ return back_msgr.get_myaddrs();
}
seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
auto osdmap = service.get_map();
// TODO: msgr v2
return seastar::when_all_succeed(
- front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
- CEPH_ENTITY_TYPE_OSD),
- back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
- CEPH_ENTITY_TYPE_OSD))
+ front_msgr.connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
+ CEPH_ENTITY_TYPE_OSD),
+ back_msgr.connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
+ CEPH_ENTITY_TYPE_OSD))
.then([this, peer, epoch] (auto xcon_front, auto xcon_back) {
PeerInfo info;
// sharded-messenger compatible mode
Heartbeat(int whoami,
uint32_t nonce,
const OSDMapService& service,
- ceph::mon::Client& monc);
+ ceph::mon::Client& monc,
+ ceph::net::Messenger& front_msgr,
+ ceph::net::Messenger& back_msgr);
seastar::future<> start(entity_addrvec_t front,
entity_addrvec_t back);
/// add enough reporters for fast failure detection
void add_reporter_peers(int whoami);
- seastar::future<> start_messenger(ceph::net::Messenger* msgr,
+ seastar::future<> start_messenger(ceph::net::Messenger& msgr,
const entity_addrvec_t& addrs);
private:
const int whoami;
const uint32_t nonce;
- ceph::net::Messenger* front_msgr = nullptr;
- ceph::net::Messenger* back_msgr = nullptr;
const OSDMapService& service;
ceph::mon::Client& monc;
+ ceph::net::Messenger& front_msgr;
+ ceph::net::Messenger& back_msgr;
seastar::timer<seastar::lowres_clock> timer;
// use real_clock so it can be converted to utime_t
#include "common/ceph_argparse.h"
#include "crimson/common/config_proxy.h"
+#include "crimson/net/SocketMessenger.h"
#include "osd.h"
usage(argv[0]);
return EXIT_SUCCESS;
}
- std::string cluster;
+ std::string cluster_name;
std::string conf_file_list;
// ceph_argparse_early_args() could _exit(), while local_conf() won't ready
// until it's started. so do the boilerplate-settings parsing here.
auto init_params = ceph_argparse_early_args(ceph_args,
CEPH_ENTITY_TYPE_OSD,
- &cluster,
+ &cluster_name,
&conf_file_list);
seastar::sharded<OSD> osd;
+ seastar::sharded<ceph::net::SocketMessenger> cluster_msgr, client_msgr;
+ seastar::sharded<ceph::net::SocketMessenger> hb_front_msgr, hb_back_msgr;
using ceph::common::sharded_conf;
using ceph::common::sharded_perf_coll;
using ceph::common::local_conf;
try {
return app.run_deprecated(app_args.size(), const_cast<char**>(app_args.data()), [&] {
auto& config = app.configuration();
- seastar::engine().at_exit([] {
- return sharded_conf().stop();
- });
- seastar::engine().at_exit([] {
- return sharded_perf_coll().stop();
- });
- seastar::engine().at_exit([&] {
- return osd.stop();
- });
- return sharded_conf().start(init_params.name, cluster).then([] {
- return sharded_perf_coll().start();
- }).then([&conf_file_list] {
- return local_conf().parse_config_files(conf_file_list);
- }).then([&] {
- return local_conf().parse_argv(ceph_args);
- }).then([&] {
- return osd.start_single(std::stoi(local_conf()->name.get_id()),
- static_cast<uint32_t>(getpid()));
- }).then([&osd, mkfs = config.count("mkfs")] {
- if (mkfs) {
- return osd.invoke_on(0, &OSD::mkfs,
- local_conf().get_val<uuid_d>("fsid"))
- .then([] { seastar::engine().exit(0); });
+ return seastar::async([&] {
+ sharded_conf().start(init_params.name, cluster_name).get();
+ sharded_perf_coll().start().get();
+ local_conf().parse_config_files(conf_file_list).get();
+ local_conf().parse_argv(ceph_args).get();
+ const int whoami = std::stoi(local_conf()->name.get_id());
+ const auto nonce = static_cast<uint32_t>(getpid());
+ const auto shard = seastar::engine().cpu_id();
+ cluster_msgr.start(entity_name_t::OSD(whoami), "cluster"s, nonce, shard).get();
+ client_msgr.start(entity_name_t::OSD(whoami), "client"s, nonce, shard).get();
+ hb_front_msgr.start(entity_name_t::OSD(whoami), "hb_front"s, nonce, shard).get();
+ hb_back_msgr.start(entity_name_t::OSD(whoami), "hb_back"s, nonce, shard).get();
+ osd.start_single(whoami, nonce,
+ reference_wrapper<ceph::net::Messenger>(cluster_msgr.local()),
+ reference_wrapper<ceph::net::Messenger>(client_msgr.local()),
+ reference_wrapper<ceph::net::Messenger>(hb_front_msgr.local()),
+ reference_wrapper<ceph::net::Messenger>(hb_back_msgr.local())).get();
+ if (config.count("mkfs")) {
+ osd.invoke_on(0, &OSD::mkfs,
+ local_conf().get_val<uuid_d>("fsid"))
+ .then([] { seastar::engine().exit(0); }).get();
} else {
- return osd.invoke_on(0, &OSD::start);
+ osd.invoke_on(0, &OSD::start).get();
}
+ seastar::engine().at_exit([&] {
+ return osd.stop();
+ });
+ seastar::engine().at_exit([&] {
+ return seastar::when_all_succeed(cluster_msgr.stop(),
+ client_msgr.stop(),
+ hb_front_msgr.stop(),
+ hb_back_msgr.stop());
+ });
+ seastar::engine().at_exit([] {
+ return sharded_perf_coll().stop();
+ });
+ seastar::engine().at_exit([] {
+ return sharded_conf().stop();
+ });
});
});
} catch (...) {
using ceph::common::local_conf;
using ceph::os::CyanStore;
-OSD::OSD(int id, uint32_t nonce)
+OSD::OSD(int id, uint32_t nonce,
+ ceph::net::Messenger& cluster_msgr,
+ ceph::net::Messenger& public_msgr,
+ ceph::net::Messenger& hb_front_msgr,
+ ceph::net::Messenger& hb_back_msgr)
: whoami{id},
nonce{nonce},
beacon_timer{[this] { send_beacon(); }},
+ cluster_msgr{cluster_msgr},
+ public_msgr{public_msgr},
+ monc{new ceph::mon::Client{public_msgr}},
+ heartbeat{new Heartbeat{whoami, nonce, *this, *monc,
+ hb_front_msgr, hb_back_msgr}},
heartbeat_timer{[this] { update_heartbeat_peers(); }},
store{std::make_unique<ceph::os::CyanStore>(
local_conf().get_val<std::string>("osd_data"))}
{
logger().info("start");
- return seastar::when_all_succeed(
- ceph::net::Messenger::create(entity_name_t::OSD(whoami),
- "cluster",
- nonce,
- seastar::engine().cpu_id())
- .then([this] (auto msgr) { cluster_msgr = msgr; }),
- ceph::net::Messenger::create(entity_name_t::OSD(whoami),
- "client",
- nonce,
- seastar::engine().cpu_id())
- .then([this] (auto msgr) { public_msgr = msgr; }))
- .then([this] {
- monc.reset(new ceph::mon::Client{*public_msgr});
- heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});
-
- for (auto msgr : {cluster_msgr, public_msgr}) {
- if (local_conf()->ms_crc_data) {
- msgr->set_crc_data();
- }
- if (local_conf()->ms_crc_header) {
- msgr->set_crc_header();
- }
- }
- dispatchers.push_front(this);
- dispatchers.push_front(monc.get());
-
- return store->mount();
- }).then([this] {
+ return store->mount().then([this] {
meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
store.get());
return meta_coll->load_superblock();
osdmap = std::move(map);
return load_pgs();
}).then([this] {
+ for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr)}) {
+ if (local_conf()->ms_crc_data) {
+ msgr.get().set_crc_data();
+ }
+ if (local_conf()->ms_crc_header) {
+ msgr.get().set_crc_header();
+ }
+ }
+ dispatchers.push_front(this);
+ dispatchers.push_front(monc.get());
return seastar::when_all_succeed(
- cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max)
- .then([this] { return cluster_msgr->start(&dispatchers); }),
- public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+ cluster_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this] { return public_msgr->start(&dispatchers); }));
+ .then([this] { return cluster_msgr.start(&dispatchers); }),
+ public_msgr.try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return public_msgr.start(&dispatchers); }));
}).then([this] {
return monc->start();
}).then([this] {
return monc->renew_subs();
}).then([this] {
if (auto [addrs, changed] =
- replace_unknown_addrs(cluster_msgr->get_myaddrs(),
- public_msgr->get_myaddrs()); changed) {
- cluster_msgr->set_myaddrs(addrs);
+ replace_unknown_addrs(cluster_msgr.get_myaddrs(),
+ public_msgr.get_myaddrs()); changed) {
+ cluster_msgr.set_myaddrs(addrs);
}
- return heartbeat->start(public_msgr->get_myaddrs(),
- cluster_msgr->get_myaddrs());
+ return heartbeat->start(public_msgr.get_myaddrs(),
+ cluster_msgr.get_myaddrs());
}).then([this] {
return start_boot();
});
logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
- logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
+ logger().info("cluster_msgr: {}", cluster_msgr.get_myaddr());
auto m = make_message<MOSDBoot>(superblock,
osdmap->get_epoch(),
osdmap->get_epoch(),
heartbeat->get_back_addrs(),
heartbeat->get_front_addrs(),
- cluster_msgr->get_myaddrs(),
+ cluster_msgr.get_myaddrs(),
CEPH_FEATURES_ALL);
return monc->send_message(m);
}
return heartbeat->stop();
}).then([this] {
return monc->stop();
- }).then([this] {
- return public_msgr->shutdown();
- }).then([this] {
- return cluster_msgr->shutdown();
}).then([this] {
return store->umount();
});
osdmap = std::move(o);
if (up_epoch != 0 &&
osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+ osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) {
up_epoch = osdmap->get_epoch();
if (!boot_epoch) {
boot_epoch = osdmap->get_epoch();
});
}).then([m, this] {
if (osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+ osdmap->get_addrs(whoami) == public_msgr.get_myaddrs() &&
bind_epoch < osdmap->get_up_from(whoami)) {
if (state.is_booting()) {
logger().info("osd.{}: activating...", whoami);
logger().info("map e {} marked osd.{} down",
osdmap->get_epoch(), whoami);
return true;
- } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
+ } else if (osdmap->get_addrs(whoami) != public_msgr.get_myaddrs()) {
logger().error("map e {} had wrong client addr ({} != my {})",
osdmap->get_epoch(),
osdmap->get_addrs(whoami),
- public_msgr->get_myaddrs());
+ public_msgr.get_myaddrs());
return true;
- } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
+ } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr.get_myaddrs()) {
logger().error("map e {} had wrong cluster addr ({} != my {})",
osdmap->get_epoch(),
osdmap->get_cluster_addrs(whoami),
- cluster_msgr->get_myaddrs());
+ cluster_msgr.get_myaddrs());
return true;
} else {
return false;
const uint32_t nonce;
seastar::timer<seastar::lowres_clock> beacon_timer;
// talk with osd
- ceph::net::Messenger* cluster_msgr = nullptr;
+ ceph::net::Messenger& cluster_msgr;
// talk with client/mon/mgr
- ceph::net::Messenger* public_msgr = nullptr;
+ ceph::net::Messenger& public_msgr;
ChainedDispatchers dispatchers;
std::unique_ptr<ceph::mon::Client> monc;
seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override;
public:
- OSD(int id, uint32_t nonce);
+ OSD(int id, uint32_t nonce,
+ ceph::net::Messenger& cluster_msgr,
+ ceph::net::Messenger& client_msgr,
+ ceph::net::Messenger& hb_front_msgr,
+ ceph::net::Messenger& hb_back_msgr);
~OSD() override;
seastar::future<> mkfs(uuid_d fsid);