monc{new crimson::mon::Client{*public_msgr, *this}},
mgrc{new crimson::mgr::Client{*public_msgr, *this}},
store{store},
+ pg_shard_manager{osd_singleton_state,
+ shard_services,
+ pg_to_shard_mappings},
// do this in background -- continuation rearms timer when complete
tick_timer{[this] {
std::ignore = update_heartbeat_peers(
seastar::future<> OSD::open_meta_coll()
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return store.get_sharded_store().open_collection(
coll_t::meta()
).then([this](auto ch) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- get_pg_shard_manager().init_meta_coll(ch, store.get_sharded_store());
+ pg_shard_manager.init_meta_coll(ch, store.get_sharded_store());
return seastar::now();
});
}
osd_singleton_state.local().recoverystate_perf,
std::ref(store),
std::ref(osd_states));
- }).then([this] {
- return shard_dispatchers.start(
- std::ref(*this),
- std::ref(pg_to_shard_mappings));
});
}).then([this] {
heartbeat.reset(new Heartbeat{
}).then([this] {
return open_meta_coll();
}).then([this] {
- return get_pg_shard_manager().get_meta_coll().load_superblock(
+ return pg_shard_manager.get_meta_coll().load_superblock(
).handle_error(
crimson::ct_error::assert_all("open_meta_coll error")
);
}).then([this](OSDSuperblock&& sb) {
superblock = std::move(sb);
- get_pg_shard_manager().set_superblock(superblock);
- return get_pg_shard_manager().get_local_map(superblock.current_epoch);
+ pg_shard_manager.set_superblock(superblock);
+ return pg_shard_manager.get_local_map(superblock.current_epoch);
}).then([this](OSDMapService::local_cached_map_t&& map) {
osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
- return get_pg_shard_manager().update_map(std::move(map));
+ return pg_shard_manager.update_map(std::move(map));
}).then([this] {
return shard_services.invoke_on_all([this](auto &local_service) {
local_service.local_state.osdmap_gate.got_map(osdmap->get_epoch());
});
}).then([this] {
bind_epoch = osdmap->get_epoch();
- return get_pg_shard_manager().load_pgs(store);
+ return pg_shard_manager.load_pgs(store);
}).then([this] {
uint64_t osd_required =
CEPH_FEATURE_UID |
seastar::future<> OSD::start_boot()
{
- get_pg_shard_manager().set_preboot();
+ pg_shard_manager.set_preboot();
return monc->get_version("osdmap").then([this](auto&& ret) {
auto [newest, oldest] = ret;
return _preboot(oldest, newest);
seastar::future<> OSD::_send_boot()
{
- get_pg_shard_manager().set_booting();
+ pg_shard_manager.set_booting();
entity_addrvec_t public_addrs = public_msgr->get_myaddrs();
entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs();
});
}
-seastar::future<> OSD::ShardDispatcher::handle_command(
+seastar::future<> OSD::handle_command(
crimson::net::ConnectionRef conn,
Ref<MCommand> m)
{
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- return osd.asok->handle_command(conn, std::move(m));
+ return asok->handle_command(conn, std::move(m));
}
/*
asok->register_command(make_asok_hook<SendBeaconHook>(*this));
asok->register_command(make_asok_hook<FlushPgStatsHook>(*this));
asok->register_command(
- make_asok_hook<DumpPGStateHistory>(std::as_const(get_pg_shard_manager())));
+ make_asok_hook<DumpPGStateHistory>(std::as_const(pg_shard_manager)));
asok->register_command(make_asok_hook<DumpMetricsHook>());
asok->register_command(make_asok_hook<DumpPerfCountersHook>());
asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services()));
// ops commands
asok->register_command(
make_asok_hook<DumpInFlightOpsHook>(
- std::as_const(get_pg_shard_manager())));
+ std::as_const(pg_shard_manager)));
asok->register_command(
make_asok_hook<DumpHistoricOpsHook>(
std::as_const(get_shard_services().get_registry())));
tick_timer.cancel();
// see also OSD::shutdown()
return prepare_to_stop().then([this] {
- return get_pg_shard_manager().set_stopping();
+ return pg_shard_manager.set_stopping();
}).then([this] {
logger().debug("prepared to stop");
public_msgr->stop();
return asok->stop().then([this] {
return heartbeat->stop();
}).then([this] {
- return get_pg_shard_manager().stop_registries();
+ return pg_shard_manager.stop_registries();
}).then([this] {
return store.umount();
}).then([this] {
return store.stop();
}).then([this] {
- return get_pg_shard_manager().stop_pgs();
+ return pg_shard_manager.stop_pgs();
}).then([this] {
return monc->stop();
}).then([this] {
return mgrc->stop();
- }).then([this] {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- return shard_dispatchers.stop();
}).then([this] {
return shard_services.stop();
}).then([this] {
f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
f->dump_stream("osd_fsid") << superblock.osd_fsid;
f->dump_unsigned("whoami", superblock.whoami);
- f->dump_string("state", get_pg_shard_manager().get_osd_state_string());
+ f->dump_string("state", pg_shard_manager.get_osd_state_string());
f->dump_unsigned("oldest_map", superblock.oldest_map);
f->dump_unsigned("cluster_osdmap_trim_lower_bound",
superblock.cluster_osdmap_trim_lower_bound);
f->dump_unsigned("newest_map", superblock.newest_map);
- f->dump_unsigned("num_pgs", get_pg_shard_manager().get_num_pgs());
+ f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs());
}
void OSD::print(std::ostream& out) const
<< superblock.osd_fsid << " [" << superblock.oldest_map
<< "," << superblock.newest_map << "] "
<< "tlb:" << superblock.cluster_osdmap_trim_lower_bound
- << " pgs:" << get_pg_shard_manager().get_num_pgs()
- << "}";
-}
-
-void OSD::ShardDispatcher::print(std::ostream& out) const
-{
- out << "{osd." << osd.superblock.whoami << " "
- << osd.superblock.osd_fsid << " [" << osd.superblock.oldest_map
- << "," << osd.superblock.newest_map << "] "
- << " pgs:" << get_pg_shard_manager().get_num_pgs()
+ << " pgs:" << pg_shard_manager.get_num_pgs()
<< "}";
}
std::optional<seastar::future<>>
OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
- if (get_pg_shard_manager().is_stopping()) {
+ if (pg_shard_manager.is_stopping()) {
return seastar::now();
}
- bool dispatched = true;
- gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn),
- m=std::move(m), &dispatched]() mutable {
- switch (m->get_type()) {
- case CEPH_MSG_OSD_MAP:
- case CEPH_MSG_OSD_OP:
- case MSG_OSD_PG_CREATE2:
- case MSG_COMMAND:
- case MSG_OSD_MARK_ME_DOWN:
- case MSG_OSD_PG_PULL:
- case MSG_OSD_PG_PUSH:
- case MSG_OSD_PG_PUSH_REPLY:
- case MSG_OSD_PG_RECOVERY_DELETE:
- case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
- case MSG_OSD_PG_SCAN:
- case MSG_OSD_PG_BACKFILL:
- case MSG_OSD_PG_BACKFILL_REMOVE:
- case MSG_OSD_PG_LEASE:
- case MSG_OSD_PG_LEASE_ACK:
- case MSG_OSD_PG_NOTIFY2:
- case MSG_OSD_PG_INFO2:
- case MSG_OSD_PG_QUERY2:
- case MSG_OSD_BACKFILL_RESERVE:
- case MSG_OSD_RECOVERY_RESERVE:
- case MSG_OSD_PG_LOG:
- case MSG_OSD_REPOP:
- case MSG_OSD_REPOPREPLY:
- case MSG_OSD_SCRUB2:
- case MSG_OSD_PG_UPDATE_LOG_MISSING:
- case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
- {
- return shard_dispatchers.local().ms_dispatch(conn, std::move(m));
- }
- default:
- {
- dispatched = false;
- return seastar::now();
- }
- }
+ auto maybe_ret = do_ms_dispatch(conn, std::move(m));
+ if (!maybe_ret.has_value()) {
+ return std::nullopt;
+ }
+
+ gate.dispatch_in_background(
+ __func__, *this, [ret=std::move(maybe_ret.value())]() mutable {
+ return std::move(ret);
});
- return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
+ return seastar::now();
}
-seastar::future<>
-OSD::ShardDispatcher::ms_dispatch(
+std::optional<seastar::future<>>
+OSD::do_ms_dispatch(
crimson::net::ConnectionRef conn,
MessageRef m)
{
// FIXME: order is not guaranteed in this path
return conn.get_foreign(
).then([this, m=std::move(m)](auto f_conn) {
- return container().invoke_on(PRIMARY_CORE,
- [f_conn=std::move(f_conn), m=std::move(m)]
- (auto& local_dispatcher) mutable {
+ return seastar::smp::submit_to(PRIMARY_CORE,
+ [f_conn=std::move(f_conn), m=std::move(m), this]() mutable {
auto conn = make_local_shared_foreign(std::move(f_conn));
- return local_dispatcher.ms_dispatch(conn, std::move(m));
+ auto ret = do_ms_dispatch(conn, std::move(m));
+ assert(ret.has_value());
+ return std::move(ret.value());
});
});
}
return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
MOSDPGUpdateLogMissingReply>(m));
default:
- return seastar::now();
+ return std::nullopt;
}
}
// MPGStats::had_map_for is not used since PGMonitor was removed
auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
m->osd_stat = osd_stat;
- return get_pg_shard_manager().get_pg_stats(
+ return pg_shard_manager.get_pg_stats(
).then([m=std::move(m)](auto &&stats) mutable {
m->pg_stat = std::move(stats);
return seastar::make_ready_future<MessageURef>(std::move(m));
return osd_stat.seq;
}
-bool OSD::ShardDispatcher::require_mon_peer(
- crimson::net::Connection *conn,
- Ref<Message> m)
-{
- if (!conn->peer_is_mon()) {
- logger().info("{} received from non-mon {}, {}",
- __func__,
- conn->get_peer_addr(),
- *m);
- return false;
- }
- return true;
-}
-
-seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m)
{
/* Ensure that only one MOSDMap is processed at a time. Allowing concurrent
* processing may eventually be worthwhile, but such an implementation would
* See https://tracker.ceph.com/issues/59165
*/
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- return osd.handle_osd_map_lock.lock().then([this, m] {
+ return handle_osd_map_lock.lock().then([this, m] {
return _handle_osd_map(m);
}).finally([this] {
- return osd.handle_osd_map_lock.unlock();
+ return handle_osd_map_lock.unlock();
});
}
-seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
{
logger().info("handle_osd_map {}", *m);
- if (m->fsid != osd.superblock.cluster_fsid) {
+ if (m->fsid != superblock.cluster_fsid) {
logger().warn("fsid mismatched");
return seastar::now();
}
const auto first = m->get_first();
const auto last = m->get_last();
logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
- first, last, osd.superblock.newest_map,
+ first, last, superblock.newest_map,
m->cluster_osdmap_trim_lower_bound, m->newest_map);
// make sure there is something new, here, before we bother flushing
// the queues and such
- if (last <= osd.superblock.newest_map) {
+ if (last <= superblock.newest_map) {
return seastar::now();
}
// missing some?
bool skip_maps = false;
- epoch_t start = osd.superblock.newest_map + 1;
+ epoch_t start = superblock.newest_map + 1;
if (first > start) {
logger().info("handle_osd_map message skips epochs {}..{}",
start, first - 1);
[=, this](auto& t) {
return pg_shard_manager.store_maps(t, start, m).then([=, this, &t] {
// even if this map isn't from a mon, we may have satisfied our subscription
- osd.monc->sub_got("osdmap", last);
- if (!osd.superblock.oldest_map || skip_maps) {
- osd.superblock.oldest_map = first;
+ monc->sub_got("osdmap", last);
+ if (!superblock.oldest_map || skip_maps) {
+ superblock.oldest_map = first;
}
- osd.superblock.newest_map = last;
- osd.superblock.current_epoch = last;
+ superblock.newest_map = last;
+ superblock.current_epoch = last;
// note in the superblock that we were clean thru the prior epoch
- if (osd.boot_epoch && osd.boot_epoch >= osd.superblock.mounted) {
- osd.superblock.mounted = osd.boot_epoch;
- osd.superblock.clean_thru = last;
+ if (boot_epoch && boot_epoch >= superblock.mounted) {
+ superblock.mounted = boot_epoch;
+ superblock.clean_thru = last;
}
- pg_shard_manager.get_meta_coll().store_superblock(t, osd.superblock);
- pg_shard_manager.set_superblock(osd.superblock);
+ pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
+ pg_shard_manager.set_superblock(superblock);
logger().debug("OSD::handle_osd_map: do_transaction...");
- return osd.store.get_sharded_store().do_transaction(
+ return store.get_sharded_store().do_transaction(
pg_shard_manager.get_meta_coll().collection(),
std::move(t));
});
});
}
-seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
+seastar::future<> OSD::committed_osd_maps(
version_t first,
version_t last,
Ref<MOSDMap> m)
{
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- logger().info("osd.{}: committed_osd_maps({}, {})", osd.whoami, first, last);
+ logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
// advance through the new maps
return seastar::do_for_each(boost::make_counting_iterator(first),
boost::make_counting_iterator(last + 1),
return pg_shard_manager.get_local_map(
cur
).then([this](OSDMapService::local_cached_map_t&& o) {
- osd.osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
+ osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
return pg_shard_manager.update_map(std::move(o));
}).then([this] {
if (get_shard_services().get_up_epoch() == 0 &&
- osd.osdmap->is_up(osd.whoami) &&
- osd.osdmap->get_addrs(osd.whoami) == osd.public_msgr->get_myaddrs()) {
+ osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
return pg_shard_manager.set_up_epoch(
- osd.osdmap->get_epoch()
+ osdmap->get_epoch()
).then([this] {
- if (!osd.boot_epoch) {
- osd.boot_epoch = osd.osdmap->get_epoch();
+ if (!boot_epoch) {
+ boot_epoch = osdmap->get_epoch();
}
});
} else {
});
}).then([m, this] {
auto fut = seastar::now();
- if (osd.osdmap->is_up(osd.whoami)) {
- const auto up_from = osd.osdmap->get_up_from(osd.whoami);
+ if (osdmap->is_up(whoami)) {
+ const auto up_from = osdmap->get_up_from(whoami);
logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
- osd.whoami, osd.osdmap->get_epoch(), up_from, osd.bind_epoch,
+ whoami, osdmap->get_epoch(), up_from, bind_epoch,
pg_shard_manager.get_osd_state_string());
- if (osd.bind_epoch < up_from &&
- osd.osdmap->get_addrs(osd.whoami) == osd.public_msgr->get_myaddrs() &&
+ if (bind_epoch < up_from &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
pg_shard_manager.is_booting()) {
- logger().info("osd.{}: activating...", osd.whoami);
+ logger().info("osd.{}: activating...", whoami);
fut = pg_shard_manager.set_active().then([this] {
- osd.beacon_timer.arm_periodic(
+ beacon_timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_beacon_report_interval));
// timer continuation rearms when complete
- osd.tick_timer.arm(
+ tick_timer.arm(
std::chrono::seconds(TICK_INTERVAL));
});
}
} else {
if (pg_shard_manager.is_prestop()) {
- osd.got_stop_ack();
+ got_stop_ack();
return seastar::now();
}
}
return check_osdmap_features().then([this] {
// yay!
logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
- " to {} epoch to pgs", osd.whoami, osd.osdmap->get_epoch());
- return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch());
+ " to {} epoch to pgs", whoami, osdmap->get_epoch());
+ return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
});
});
}).then([m, this] {
if (pg_shard_manager.is_active()) {
- logger().info("osd.{}: now active", osd.whoami);
- if (!osd.osdmap->exists(osd.whoami) ||
- osd.osdmap->is_stop(osd.whoami)) {
- return osd.shutdown();
+ logger().info("osd.{}: now active", whoami);
+ if (!osdmap->exists(whoami) ||
+ osdmap->is_stop(whoami)) {
+ return shutdown();
}
- if (osd.should_restart()) {
- return osd.restart();
+ if (should_restart()) {
+ return restart();
} else {
return seastar::now();
}
} else if (pg_shard_manager.is_preboot()) {
- logger().info("osd.{}: now preboot", osd.whoami);
+ logger().info("osd.{}: now preboot", whoami);
if (m->get_source().is_mon()) {
- return osd._preboot(
+ return _preboot(
m->cluster_osdmap_trim_lower_bound, m->newest_map);
} else {
- logger().info("osd.{}: start_boot", osd.whoami);
- return osd.start_boot();
+ logger().info("osd.{}: start_boot", whoami);
+ return start_boot();
}
} else {
- logger().info("osd.{}: now {}", osd.whoami,
+ logger().info("osd.{}: now {}", whoami,
pg_shard_manager.get_osd_state_string());
// XXX
return seastar::now();
});
}
-seastar::future<> OSD::ShardDispatcher::handle_osd_op(
+seastar::future<> OSD::handle_osd_op(
crimson::net::ConnectionRef conn,
Ref<MOSDOp> m)
{
std::move(m)).second;
}
-seastar::future<> OSD::ShardDispatcher::handle_pg_create(
+seastar::future<> OSD::handle_pg_create(
crimson::net::ConnectionRef conn,
Ref<MOSDPGCreate2> m)
{
});
}
-seastar::future<> OSD::ShardDispatcher::handle_update_log_missing(
+seastar::future<> OSD::handle_update_log_missing(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissing> m)
{
std::move(m)).second;
}
-seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply(
+seastar::future<> OSD::handle_update_log_missing_reply(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissingReply> m)
{
std::move(m)).second;
}
-seastar::future<> OSD::ShardDispatcher::handle_rep_op(
+seastar::future<> OSD::handle_rep_op(
crimson::net::ConnectionRef conn,
Ref<MOSDRepOp> m)
{
std::move(m)).second;
}
-seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply(
+seastar::future<> OSD::handle_rep_op_reply(
crimson::net::ConnectionRef conn,
Ref<MOSDRepOpReply> m)
{
});
}
-seastar::future<> OSD::ShardDispatcher::handle_scrub(
+seastar::future<> OSD::handle_scrub(
crimson::net::ConnectionRef conn,
Ref<MOSDScrub2> m)
{
- if (m->fsid != osd.superblock.cluster_fsid) {
+ if (m->fsid != superblock.cluster_fsid) {
logger().warn("fsid mismatched");
return seastar::now();
}
});
}
-seastar::future<> OSD::ShardDispatcher::handle_mark_me_down(
+seastar::future<> OSD::handle_mark_me_down(
crimson::net::ConnectionRef conn,
Ref<MOSDMarkMeDown> m)
{
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
if (pg_shard_manager.is_prestop()) {
- osd.got_stop_ack();
+ got_stop_ack();
}
return seastar::now();
}
-seastar::future<> OSD::ShardDispatcher::handle_recovery_subreq(
+seastar::future<> OSD::handle_recovery_subreq(
crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m)
{
{
beacon_timer.cancel();
tick_timer.cancel();
- return get_pg_shard_manager().set_up_epoch(
+ return pg_shard_manager.set_up_epoch(
0
).then([this] {
bind_epoch = osdmap->get_epoch();
seastar::future<> OSD::send_beacon()
{
- if (!get_pg_shard_manager().is_active()) {
+ if (!pg_shard_manager.is_active()) {
return seastar::now();
}
// FIXME: min lec should be calculated from pg_stat
seastar::future<> OSD::update_heartbeat_peers()
{
- if (!get_pg_shard_manager().is_active()) {
+ if (!pg_shard_manager.is_active()) {
return seastar::now();;
}
- get_pg_shard_manager().for_each_pgid([this](auto &pgid) {
+ pg_shard_manager.for_each_pgid([this](auto &pgid) {
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pgid.pgid,
&up, nullptr,
return seastar::now();
}
-seastar::future<> OSD::ShardDispatcher::handle_peering_op(
+seastar::future<> OSD::handle_peering_op(
crimson::net::ConnectionRef conn,
Ref<MOSDPeeringOp> m)
{
std::move(*evt)).second;
}
-seastar::future<> OSD::ShardDispatcher::check_osdmap_features()
+seastar::future<> OSD::check_osdmap_features()
{
- return osd.store.write_meta(
+ assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return store.write_meta(
"require_osd_release",
- stringify((int)osd.osdmap->require_osd_release));
+ stringify((int)osdmap->require_osd_release));
}
seastar::future<> OSD::prepare_to_stop()
{
if (osdmap && osdmap->is_up(whoami)) {
- get_pg_shard_manager().set_prestop();
+ pg_shard_manager.set_prestop();
const auto timeout =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::duration<double>(
class OSD final : public crimson::net::Dispatcher,
private crimson::common::AuthHandler,
private crimson::mgr::WithStats {
-public:
- class ShardDispatcher
- : public seastar::peering_sharded_service<ShardDispatcher> {
- friend class OSD;
- public:
- ShardDispatcher(
- OSD& osd,
- PGShardMapping& pg_to_shard_mapping)
- : pg_shard_manager(osd.osd_singleton_state,
- osd.shard_services, pg_to_shard_mapping),
- osd(osd) {}
- ~ShardDispatcher() = default;
-
- // Dispatcher methods
- seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
-
- private:
- bool require_mon_peer(crimson::net::Connection *conn, Ref<Message> m);
-
- seastar::future<> handle_osd_map(Ref<MOSDMap> m);
- seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
- seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
- Ref<MOSDPGCreate2> m);
- seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
- Ref<MOSDOp> m);
- seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOp> m);
- seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOpReply> m);
- seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
- Ref<MOSDPeeringOp> m);
- seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
- Ref<MOSDFastDispatchOp> m);
- seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
- Ref<MOSDScrub2> m);
- seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
- Ref<MOSDMarkMeDown> m);
-
- seastar::future<> committed_osd_maps(version_t first,
- version_t last,
- Ref<MOSDMap> m);
-
- seastar::future<> check_osdmap_features();
-
- seastar::future<> handle_command(crimson::net::ConnectionRef conn,
- Ref<MCommand> m);
- seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
- Ref<MOSDPGUpdateLogMissing> m);
- seastar::future<> handle_update_log_missing_reply(
- crimson::net::ConnectionRef conn,
- Ref<MOSDPGUpdateLogMissingReply> m);
-
- public:
- void print(std::ostream&) const;
-
- auto &get_pg_shard_manager() {
- return pg_shard_manager;
- }
- auto &get_pg_shard_manager() const {
- return pg_shard_manager;
- }
- ShardServices &get_shard_services() {
- return pg_shard_manager.get_shard_services();
- }
-
- private:
- crimson::osd::PGShardManager pg_shard_manager;
- OSD& osd;
- };
-
const int whoami;
const uint32_t nonce;
seastar::abort_source& abort_source;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
+ std::optional<seastar::future<>> do_ms_dispatch(crimson::net::ConnectionRef, MessageRef);
+
// mgr::WithStats methods
// pg statistics including osd ones
osd_stat_t osd_stat;
seastar::sharded<OSDSingletonState> osd_singleton_state;
seastar::sharded<OSDState> osd_states;
seastar::sharded<ShardServices> shard_services;
- seastar::sharded<ShardDispatcher> shard_dispatchers;
+
+ crimson::osd::PGShardManager pg_shard_manager;
std::unique_ptr<Heartbeat> heartbeat;
seastar::timer<seastar::lowres_clock> tick_timer;
crimson::net::MessengerRef hb_back_msgr);
~OSD() final;
+ auto &get_pg_shard_manager() {
+ return pg_shard_manager;
+ }
+
seastar::future<> open_meta_coll();
static seastar::future<OSDMeta> open_or_create_meta_coll(
crimson::os::FuturizedStore &store
uint64_t send_pg_stats();
auto &get_shard_services() {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.local();
}
- auto &get_pg_shard_manager() {
- return shard_dispatchers.local().get_pg_shard_manager();
- }
-
- auto &get_pg_shard_manager() const {
- return shard_dispatchers.local().get_pg_shard_manager();
- }
-
private:
static seastar::future<> _write_superblock(
crimson::os::FuturizedStore &store,
void write_superblock(ceph::os::Transaction& t);
seastar::future<> read_superblock();
+ seastar::future<> handle_osd_map(Ref<MOSDMap> m);
+ seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
+ seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGCreate2> m);
+ seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
+ Ref<MOSDOp> m);
+ seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOp> m);
+ seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOpReply> m);
+ seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
+ Ref<MOSDPeeringOp> m);
+ seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp> m);
+ seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
+ Ref<MOSDScrub2> m);
+ seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
+ Ref<MOSDMarkMeDown> m);
+
+ seastar::future<> committed_osd_maps(version_t first,
+ version_t last,
+ Ref<MOSDMap> m);
+
+ seastar::future<> check_osdmap_features();
+
+ seastar::future<> handle_command(crimson::net::ConnectionRef conn,
+ Ref<MCommand> m);
+ seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissing> m);
+ seastar::future<> handle_update_log_missing_reply(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissingReply> m);
+
private:
crimson::common::Gated gate;
return out;
}
-inline std::ostream& operator<<(std::ostream& out,
- const OSD::ShardDispatcher& shard_dispatcher) {
- shard_dispatcher.print(out);
- return out;
-}
-
}
#if FMT_VERSION >= 90000
template <> struct fmt::formatter<crimson::osd::OSD> : fmt::ostream_formatter {};
-template <> struct fmt::formatter<crimson::osd::OSD::ShardDispatcher> : fmt::ostream_formatter {};
#endif