return store.get_sharded_store().open_collection(
coll_t::meta()
).then([this](auto ch) {
- pg_shard_manager.init_meta_coll(ch, store.get_sharded_store());
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ get_pg_shard_manager().init_meta_coll(ch, store.get_sharded_store());
return seastar::now();
});
}
logger().info("start");
startup_time = ceph::mono_clock::now();
-
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return store.start().then([this] {
- return pg_shard_manager.start(
- whoami, *cluster_msgr,
- *public_msgr, *monc, *mgrc, store);
+ return osd_singleton_state.start_single(
+ whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
+ std::ref(*monc), std::ref(*mgrc)
+ ).then([this] {
+ ceph::mono_time startup_time = ceph::mono_clock::now();
+ return shard_services.start(
+ std::ref(osd_singleton_state),
+ whoami,
+ startup_time,
+ osd_singleton_state.local().perf,
+ osd_singleton_state.local().recoverystate_perf,
+ std::ref(store));
+ }).then([this] {
+ return shard_dispatchers.start(
+ std::ref(*this),
+ whoami,
+ std::ref(store));
+ });
}).then([this] {
heartbeat.reset(new Heartbeat{
whoami, get_shard_services(),
}).then([this] {
return open_meta_coll();
}).then([this] {
- return pg_shard_manager.get_meta_coll().load_superblock(
+ return get_pg_shard_manager().get_meta_coll().load_superblock(
).handle_error(
crimson::ct_error::assert_all("open_meta_coll error")
);
}).then([this](OSDSuperblock&& sb) {
superblock = std::move(sb);
- pg_shard_manager.set_superblock(superblock);
- return pg_shard_manager.get_local_map(superblock.current_epoch);
+ get_pg_shard_manager().set_superblock(superblock);
+ return get_pg_shard_manager().get_local_map(superblock.current_epoch);
}).then([this](OSDMapService::local_cached_map_t&& map) {
osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
- return pg_shard_manager.update_map(std::move(map));
+ return get_pg_shard_manager().update_map(std::move(map));
}).then([this] {
- pg_shard_manager.got_map(osdmap->get_epoch());
+ get_pg_shard_manager().got_map(osdmap->get_epoch());
bind_epoch = osdmap->get_epoch();
- return pg_shard_manager.load_pgs(store);
+ return get_pg_shard_manager().load_pgs(store);
}).then([this] {
uint64_t osd_required =
seastar::future<> OSD::start_boot()
{
- pg_shard_manager.set_preboot();
+ get_pg_shard_manager().set_preboot();
return monc->get_version("osdmap").then([this](auto&& ret) {
auto [newest, oldest] = ret;
return _preboot(oldest, newest);
seastar::future<> OSD::_send_boot()
{
- pg_shard_manager.set_booting();
+ get_pg_shard_manager().set_booting();
entity_addrvec_t public_addrs = public_msgr->get_myaddrs();
entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs();
});
}
-seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
- Ref<MCommand> m)
+seastar::future<> OSD::ShardDispatcher::handle_command(
+ crimson::net::ConnectionRef conn,
+ Ref<MCommand> m)
{
- return asok->handle_command(conn, std::move(m));
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return osd.asok->handle_command(conn, std::move(m));
}
/*
asok->register_command(make_asok_hook<SendBeaconHook>(*this));
asok->register_command(make_asok_hook<FlushPgStatsHook>(*this));
asok->register_command(
- make_asok_hook<DumpPGStateHistory>(std::as_const(pg_shard_manager)));
+ make_asok_hook<DumpPGStateHistory>(std::as_const(get_pg_shard_manager())));
asok->register_command(make_asok_hook<DumpMetricsHook>());
asok->register_command(make_asok_hook<DumpPerfCountersHook>());
asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services()));
// ops commands
asok->register_command(
make_asok_hook<DumpInFlightOpsHook>(
- std::as_const(pg_shard_manager)));
+ std::as_const(get_pg_shard_manager())));
asok->register_command(
make_asok_hook<DumpHistoricOpsHook>(
std::as_const(get_shard_services().get_registry())));
tick_timer.cancel();
// see also OSD::shutdown()
return prepare_to_stop().then([this] {
- pg_shard_manager.set_stopping();
+ get_pg_shard_manager().set_stopping();
logger().debug("prepared to stop");
public_msgr->stop();
cluster_msgr->stop();
return asok->stop().then([this] {
return heartbeat->stop();
}).then([this] {
- return pg_shard_manager.stop_registries();
+ return get_pg_shard_manager().stop_registries();
}).then([this] {
return store.umount();
}).then([this] {
return store.stop();
}).then([this] {
- return pg_shard_manager.stop_pgs();
+ return get_pg_shard_manager().stop_pgs();
}).then([this] {
return monc->stop();
}).then([this] {
return mgrc->stop();
}).then([this] {
- return pg_shard_manager.stop();
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return shard_dispatchers.stop();
+ }).then([this] {
+ return shard_services.stop();
+ }).then([this] {
+ return osd_singleton_state.stop();
}).then([fut=std::move(gate_close_fut)]() mutable {
return std::move(fut);
}).then([this] {
f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
f->dump_stream("osd_fsid") << superblock.osd_fsid;
f->dump_unsigned("whoami", superblock.whoami);
- f->dump_string("state", pg_shard_manager.get_osd_state_string());
+ f->dump_string("state", get_pg_shard_manager().get_osd_state_string());
f->dump_unsigned("oldest_map", superblock.oldest_map);
f->dump_unsigned("cluster_osdmap_trim_lower_bound",
superblock.cluster_osdmap_trim_lower_bound);
f->dump_unsigned("newest_map", superblock.newest_map);
- f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs());
+ f->dump_unsigned("num_pgs", get_pg_shard_manager().get_num_pgs());
}
void OSD::print(std::ostream& out) const
<< superblock.osd_fsid << " [" << superblock.oldest_map
<< "," << superblock.newest_map << "] "
<< "tlb:" << superblock.cluster_osdmap_trim_lower_bound
- << " pgs:" << pg_shard_manager.get_num_pgs()
+ << " pgs:" << get_pg_shard_manager().get_num_pgs()
+ << "}";
+}
+
+void OSD::ShardDispatcher::print(std::ostream& out) const
+{
+ out << "{osd." << osd.superblock.whoami << " "
+ << osd.superblock.osd_fsid << " [" << osd.superblock.oldest_map
+ << "," << osd.superblock.newest_map << "] "
+ << " pgs:" << get_pg_shard_manager().get_num_pgs()
<< "}";
}
std::optional<seastar::future<>>
OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
- if (pg_shard_manager.is_stopping()) {
- return {};
- }
- // XXX: we're assuming the `switch` part is executed immediately, and thus
- // we won't smash the stack. Taking into account how `seastar::with_gate`
- // is currently implemented, this seems to be the case (Summer 2022).
+ assert(seastar::this_shard_id() == PRIMARY_CORE);
bool dispatched = true;
gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn),
m=std::move(m), &dispatched] {
switch (m->get_type()) {
- case CEPH_MSG_OSD_MAP:
- return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m));
- case CEPH_MSG_OSD_OP:
- return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
- case MSG_OSD_PG_CREATE2:
- return handle_pg_create(
- conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
- return seastar::now();
- case MSG_COMMAND:
- return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
- case MSG_OSD_MARK_ME_DOWN:
- return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
- case MSG_OSD_PG_PULL:
- [[fallthrough]];
- case MSG_OSD_PG_PUSH:
- [[fallthrough]];
- case MSG_OSD_PG_PUSH_REPLY:
- [[fallthrough]];
- case MSG_OSD_PG_RECOVERY_DELETE:
- [[fallthrough]];
- case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
- [[fallthrough]];
- case MSG_OSD_PG_SCAN:
- [[fallthrough]];
- case MSG_OSD_PG_BACKFILL:
- [[fallthrough]];
- case MSG_OSD_PG_BACKFILL_REMOVE:
- return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
- case MSG_OSD_PG_LEASE:
- [[fallthrough]];
- case MSG_OSD_PG_LEASE_ACK:
- [[fallthrough]];
- case MSG_OSD_PG_NOTIFY2:
- [[fallthrough]];
- case MSG_OSD_PG_INFO2:
- [[fallthrough]];
- case MSG_OSD_PG_QUERY2:
- [[fallthrough]];
- case MSG_OSD_BACKFILL_RESERVE:
- [[fallthrough]];
- case MSG_OSD_RECOVERY_RESERVE:
- [[fallthrough]];
- case MSG_OSD_PG_LOG:
- return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
- case MSG_OSD_REPOP:
- return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
- case MSG_OSD_REPOPREPLY:
- return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
- case MSG_OSD_SCRUB2:
- return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
- case MSG_OSD_PG_UPDATE_LOG_MISSING:
- return handle_update_log_missing(conn, boost::static_pointer_cast<
- MOSDPGUpdateLogMissing>(m));
- case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
- return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
- MOSDPGUpdateLogMissingReply>(m));
- default:
- dispatched = false;
- return seastar::now();
+ case CEPH_MSG_OSD_MAP:
+ case CEPH_MSG_OSD_OP:
+ case MSG_OSD_PG_CREATE2:
+ case MSG_COMMAND:
+ case MSG_OSD_MARK_ME_DOWN:
+ case MSG_OSD_PG_PULL:
+ case MSG_OSD_PG_PUSH:
+ case MSG_OSD_PG_PUSH_REPLY:
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ case MSG_OSD_PG_SCAN:
+ case MSG_OSD_PG_BACKFILL:
+ case MSG_OSD_PG_BACKFILL_REMOVE:
+ case MSG_OSD_PG_LEASE:
+ case MSG_OSD_PG_LEASE_ACK:
+ case MSG_OSD_PG_NOTIFY2:
+ case MSG_OSD_PG_INFO2:
+ case MSG_OSD_PG_QUERY2:
+ case MSG_OSD_BACKFILL_RESERVE:
+ case MSG_OSD_RECOVERY_RESERVE:
+ case MSG_OSD_PG_LOG:
+ case MSG_OSD_REPOP:
+ case MSG_OSD_REPOPREPLY:
+ case MSG_OSD_SCRUB2:
+ case MSG_OSD_PG_UPDATE_LOG_MISSING:
+ case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ {
+ return shard_dispatchers.invoke_on(0,
+ [conn, m = std::move(m)]
+ (auto &local_dispatcher) mutable ->seastar::future<>{
+ return local_dispatcher.ms_dispatch(std::move(conn), m);
+ });
+ }
+ default:
+ {
+ dispatched = false;
+ return seastar::now();
+ }
}
});
return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
+seastar::future<>
+OSD::ShardDispatcher::ms_dispatch(
+ crimson::net::ConnectionRef conn,
+ MessageRef m)
+{
+ if (pg_shard_manager.is_stopping()) {
+ return seastar::now();
+ }
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_MAP:
+ return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m));
+ case CEPH_MSG_OSD_OP:
+ return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+ case MSG_OSD_PG_CREATE2:
+ return handle_pg_create(
+ conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
+ return seastar::now();
+ case MSG_COMMAND:
+ return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
+ case MSG_OSD_MARK_ME_DOWN:
+ return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
+ case MSG_OSD_PG_PULL:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_PG_SCAN:
+ [[fallthrough]];
+ case MSG_OSD_PG_BACKFILL:
+ [[fallthrough]];
+ case MSG_OSD_PG_BACKFILL_REMOVE:
+ return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
+ case MSG_OSD_PG_LEASE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LEASE_ACK:
+ [[fallthrough]];
+ case MSG_OSD_PG_NOTIFY2:
+ [[fallthrough]];
+ case MSG_OSD_PG_INFO2:
+ [[fallthrough]];
+ case MSG_OSD_PG_QUERY2:
+ [[fallthrough]];
+ case MSG_OSD_BACKFILL_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_RECOVERY_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LOG:
+ return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+ case MSG_OSD_REPOP:
+ return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+ case MSG_OSD_REPOPREPLY:
+ return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
+ case MSG_OSD_SCRUB2:
+ return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+ case MSG_OSD_PG_UPDATE_LOG_MISSING:
+ return handle_update_log_missing(conn, boost::static_pointer_cast<
+ MOSDPGUpdateLogMissing>(m));
+ case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
+ MOSDPGUpdateLogMissingReply>(m));
+ default:
+ return seastar::now();
+ }
+}
+
void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
// TODO: cleanup the session attached to this connection
// MPGStats::had_map_for is not used since PGMonitor was removed
auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
m->osd_stat = osd_stat;
- return pg_shard_manager.get_pg_stats(
+ return get_pg_shard_manager().get_pg_stats(
).then([m=std::move(m)](auto &&stats) mutable {
m->pg_stat = std::move(stats);
return seastar::make_ready_future<MessageURef>(std::move(m));
return osd_stat.seq;
}
-bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
+bool OSD::ShardDispatcher::require_mon_peer(
+ crimson::net::Connection *conn,
+ Ref<Message> m)
{
if (!conn->peer_is_mon()) {
logger().info("{} received from non-mon {}, {}",
return true;
}
-seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::ShardDispatcher::handle_osd_map(Ref<MOSDMap> m)
{
/* Ensure that only one MOSDMap is processed at a time. Allowing concurrent
* processing may eventually be worthwhile, but such an implementation would
* simpler invariant for now.
* See https://tracker.ceph.com/issues/59165
*/
- return handle_osd_map_lock.lock().then([this, m] {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return osd.handle_osd_map_lock.lock().then([this, m] {
return _handle_osd_map(m);
}).finally([this] {
- return handle_osd_map_lock.unlock();
+ return osd.handle_osd_map_lock.unlock();
});
}
-seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
+seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
{
logger().info("handle_osd_map {}", *m);
- if (m->fsid != superblock.cluster_fsid) {
+ if (m->fsid != osd.superblock.cluster_fsid) {
logger().warn("fsid mismatched");
return seastar::now();
}
const auto first = m->get_first();
const auto last = m->get_last();
logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
- first, last, superblock.newest_map,
+ first, last, osd.superblock.newest_map,
m->cluster_osdmap_trim_lower_bound, m->newest_map);
// make sure there is something new, here, before we bother flushing
// the queues and such
- if (last <= superblock.newest_map) {
+ if (last <= osd.superblock.newest_map) {
return seastar::now();
}
// missing some?
bool skip_maps = false;
- epoch_t start = superblock.newest_map + 1;
+ epoch_t start = osd.superblock.newest_map + 1;
if (first > start) {
logger().info("handle_osd_map message skips epochs {}..{}",
start, first - 1);
[=, this](auto& t) {
return pg_shard_manager.store_maps(t, start, m).then([=, this, &t] {
// even if this map isn't from a mon, we may have satisfied our subscription
- monc->sub_got("osdmap", last);
- if (!superblock.oldest_map || skip_maps) {
- superblock.oldest_map = first;
+ osd.monc->sub_got("osdmap", last);
+ if (!osd.superblock.oldest_map || skip_maps) {
+ osd.superblock.oldest_map = first;
}
- superblock.newest_map = last;
- superblock.current_epoch = last;
+ osd.superblock.newest_map = last;
+ osd.superblock.current_epoch = last;
// note in the superblock that we were clean thru the prior epoch
- if (boot_epoch && boot_epoch >= superblock.mounted) {
- superblock.mounted = boot_epoch;
- superblock.clean_thru = last;
+ if (osd.boot_epoch && osd.boot_epoch >= osd.superblock.mounted) {
+ osd.superblock.mounted = osd.boot_epoch;
+ osd.superblock.clean_thru = last;
}
- pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
- pg_shard_manager.set_superblock(superblock);
+ pg_shard_manager.get_meta_coll().store_superblock(t, osd.superblock);
+ pg_shard_manager.set_superblock(osd.superblock);
logger().debug("OSD::handle_osd_map: do_transaction...");
return store.get_sharded_store().do_transaction(
pg_shard_manager.get_meta_coll().collection(),
});
}
-seastar::future<> OSD::committed_osd_maps(version_t first,
- version_t last,
- Ref<MOSDMap> m)
+seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
+ version_t first,
+ version_t last,
+ Ref<MOSDMap> m)
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
// advance through the new maps
return seastar::do_for_each(boost::make_counting_iterator(first),
return pg_shard_manager.get_local_map(
cur
).then([this](OSDMapService::local_cached_map_t&& o) {
- osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
+ osd.osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
return pg_shard_manager.update_map(std::move(o));
}).then([this] {
if (get_shard_services().get_up_epoch() == 0 &&
- osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+ osd.osdmap->is_up(whoami) &&
+ osd.osdmap->get_addrs(whoami) == osd.public_msgr->get_myaddrs()) {
return pg_shard_manager.set_up_epoch(
- osdmap->get_epoch()
+ osd.osdmap->get_epoch()
).then([this] {
- if (!boot_epoch) {
- boot_epoch = osdmap->get_epoch();
+ if (!osd.boot_epoch) {
+ osd.boot_epoch = osd.osdmap->get_epoch();
}
});
} else {
}
});
}).then([m, this] {
- if (osdmap->is_up(whoami)) {
- const auto up_from = osdmap->get_up_from(whoami);
+ if (osd.osdmap->is_up(whoami)) {
+ const auto up_from = osd.osdmap->get_up_from(whoami);
logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
- whoami, osdmap->get_epoch(), up_from, bind_epoch,
+ whoami, osd.osdmap->get_epoch(), up_from, osd.bind_epoch,
pg_shard_manager.get_osd_state_string());
- if (bind_epoch < up_from &&
- osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+ if (osd.bind_epoch < up_from &&
+ osd.osdmap->get_addrs(whoami) == osd.public_msgr->get_myaddrs() &&
pg_shard_manager.is_booting()) {
logger().info("osd.{}: activating...", whoami);
pg_shard_manager.set_active();
- beacon_timer.arm_periodic(
+ osd.beacon_timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_beacon_report_interval));
// timer continuation rearms when complete
- tick_timer.arm(
+ osd.tick_timer.arm(
std::chrono::seconds(TICK_INTERVAL));
}
} else {
if (pg_shard_manager.is_prestop()) {
- got_stop_ack();
+ osd.got_stop_ack();
return seastar::now();
}
}
return check_osdmap_features().then([this] {
// yay!
logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
- " to {} epoch to pgs", whoami, osdmap->get_epoch());
- return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+ " to {} epoch to pgs", whoami, osd.osdmap->get_epoch());
+ return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch());
});
}).then([m, this] {
if (pg_shard_manager.is_active()) {
logger().info("osd.{}: now active", whoami);
- if (!osdmap->exists(whoami) ||
- osdmap->is_stop(whoami)) {
- return shutdown();
+ if (!osd.osdmap->exists(whoami) ||
+ osd.osdmap->is_stop(whoami)) {
+ return osd.shutdown();
}
- if (should_restart()) {
- return restart();
+ if (osd.should_restart()) {
+ return osd.restart();
} else {
return seastar::now();
}
logger().info("osd.{}: now preboot", whoami);
if (m->get_source().is_mon()) {
- return _preboot(
+ return osd._preboot(
m->cluster_osdmap_trim_lower_bound, m->newest_map);
} else {
logger().info("osd.{}: start_boot", whoami);
- return start_boot();
+ return osd.start_boot();
}
} else {
logger().info("osd.{}: now {}", whoami,
});
}
-seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
- Ref<MOSDOp> m)
+seastar::future<> OSD::ShardDispatcher::handle_osd_op(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDOp> m)
{
(void) pg_shard_manager.start_pg_operation<ClientRequest>(
get_shard_services(),
return seastar::now();
}
-seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
- Ref<MOSDPGCreate2> m)
+seastar::future<> OSD::ShardDispatcher::handle_pg_create(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGCreate2> m)
{
for (auto& [pgid, when] : m->pgs) {
const auto &[created, created_stamp] = when;
return seastar::now();
}
-seastar::future<> OSD::handle_update_log_missing(
+seastar::future<> OSD::ShardDispatcher::handle_update_log_missing(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissing> m)
{
return seastar::now();
}
-seastar::future<> OSD::handle_update_log_missing_reply(
+seastar::future<> OSD::ShardDispatcher::handle_update_log_missing_reply(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissingReply> m)
{
return seastar::now();
}
-seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOp> m)
+seastar::future<> OSD::ShardDispatcher::handle_rep_op(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOp> m)
{
m->finish_decode();
std::ignore = pg_shard_manager.start_pg_operation<RepRequest>(
return seastar::now();
}
-seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOpReply> m)
+seastar::future<> OSD::ShardDispatcher::handle_rep_op_reply(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOpReply> m)
{
spg_t pgid = m->get_spg();
return pg_shard_manager.with_pg(
});
}
-seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
- Ref<MOSDScrub2> m)
+seastar::future<> OSD::ShardDispatcher::handle_scrub(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDScrub2> m)
{
- if (m->fsid != superblock.cluster_fsid) {
+ if (m->fsid != osd.superblock.cluster_fsid) {
logger().warn("fsid mismatched");
return seastar::now();
}
});
}
-seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
- Ref<MOSDMarkMeDown> m)
+seastar::future<> OSD::ShardDispatcher::handle_mark_me_down(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDMarkMeDown> m)
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
if (pg_shard_manager.is_prestop()) {
- got_stop_ack();
+ osd.got_stop_ack();
}
return seastar::now();
}
-seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
- Ref<MOSDFastDispatchOp> m)
+seastar::future<> OSD::ShardDispatcher::handle_recovery_subreq(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp> m)
{
std::ignore = pg_shard_manager.start_pg_operation<RecoverySubRequest>(
conn, std::move(m));
{
beacon_timer.cancel();
tick_timer.cancel();
- return pg_shard_manager.set_up_epoch(
+ return get_pg_shard_manager().set_up_epoch(
0
).then([this] {
bind_epoch = osdmap->get_epoch();
seastar::future<> OSD::send_beacon()
{
- if (!pg_shard_manager.is_active()) {
+ if (!get_pg_shard_manager().is_active()) {
return seastar::now();
}
// FIXME: min lec should be calculated from pg_stat
seastar::future<> OSD::update_heartbeat_peers()
{
- if (!pg_shard_manager.is_active()) {
+ if (!get_pg_shard_manager().is_active()) {
return seastar::now();;
}
- pg_shard_manager.for_each_pgid([this](auto &pgid) {
+ get_pg_shard_manager().for_each_pgid([this](auto &pgid) {
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pgid.pgid,
&up, nullptr,
return seastar::now();
}
-seastar::future<> OSD::handle_peering_op(
+seastar::future<> OSD::ShardDispatcher::handle_peering_op(
crimson::net::ConnectionRef conn,
Ref<MOSDPeeringOp> m)
{
return seastar::now();
}
-seastar::future<> OSD::check_osdmap_features()
+seastar::future<> OSD::ShardDispatcher::check_osdmap_features()
{
return store.write_meta("require_osd_release",
- stringify((int)osdmap->require_osd_release));
+ stringify((int)osd.osdmap->require_osd_release));
}
seastar::future<> OSD::prepare_to_stop()
{
if (osdmap && osdmap->is_up(whoami)) {
- pg_shard_manager.set_prestop();
+ get_pg_shard_manager().set_prestop();
const auto timeout =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::duration<double>(
class OSD final : public crimson::net::Dispatcher,
private crimson::common::AuthHandler,
private crimson::mgr::WithStats {
+public:
+ class ShardDispatcher
+ : public seastar::peering_sharded_service<ShardDispatcher> {
+ friend class OSD;
+ public:
+ ShardDispatcher(
+ OSD& osd,
+ int whoami,
+ crimson::os::FuturizedStore& store)
+ : pg_shard_manager(osd.osd_singleton_state, osd.shard_services),
+ osd(osd),
+ whoami(whoami),
+ store(store) {}
+ ~ShardDispatcher() = default;
+
+ // Dispatcher methods
+ seastar::future<> ms_dispatch(crimson::net::ConnectionRef,
+ MessageRef);
+
+ private:
+ bool require_mon_peer(crimson::net::Connection *conn, Ref<Message> m);
+
+ seastar::future<> handle_osd_map(Ref<MOSDMap> m);
+ seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
+ seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGCreate2> m);
+ seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
+ Ref<MOSDOp> m);
+ seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOp> m);
+ seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOpReply> m);
+ seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
+ Ref<MOSDPeeringOp> m);
+ seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp> m);
+ seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
+ Ref<MOSDScrub2> m);
+ seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
+ Ref<MOSDMarkMeDown> m);
+
+ seastar::future<> committed_osd_maps(version_t first,
+ version_t last,
+ Ref<MOSDMap> m);
+
+ seastar::future<> check_osdmap_features();
+
+ seastar::future<> handle_command(crimson::net::ConnectionRef conn,
+ Ref<MCommand> m);
+ seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissing> m);
+ seastar::future<> handle_update_log_missing_reply(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissingReply> m);
+
+ public:
+ void print(std::ostream&) const;
+
+ auto &get_pg_shard_manager() {
+ return pg_shard_manager;
+ }
+ auto &get_pg_shard_manager() const {
+ return pg_shard_manager;
+ }
+ ShardServices &get_shard_services() {
+ return pg_shard_manager.get_shard_services();
+ }
+
+ private:
+ crimson::osd::PGShardManager pg_shard_manager;
+ OSD& osd;
+ const int whoami;
+ crimson::os::FuturizedStore& store;
+ };
+
const int whoami;
const uint32_t nonce;
seastar::abort_source& abort_source;
void handle_authentication(const EntityName& name,
const AuthCapsInfo& caps) final;
- crimson::osd::PGShardManager pg_shard_manager;
+ seastar::sharded<OSDSingletonState> osd_singleton_state;
+ seastar::sharded<ShardServices> shard_services;
+ seastar::sharded<ShardDispatcher> shard_dispatchers;
std::unique_ptr<Heartbeat> heartbeat;
seastar::timer<seastar::lowres_clock> tick_timer;
/// @return the seq id of the pg stats being sent
uint64_t send_pg_stats();
+ auto &get_shard_services() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return shard_services.local();
+ }
+
+ auto &get_pg_shard_manager() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return shard_dispatchers.local().get_pg_shard_manager();
+ }
+
+ auto &get_pg_shard_manager() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return shard_dispatchers.local().get_pg_shard_manager();
+ }
+
private:
static seastar::future<> _write_superblock(
crimson::os::FuturizedStore &store,
seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
+ seastar::future<> start_asok_admin();
+
void write_superblock(ceph::os::Transaction& t);
seastar::future<> read_superblock();
- bool require_mon_peer(crimson::net::Connection *conn, Ref<Message> m);
-
- seastar::future<> handle_osd_map(Ref<MOSDMap> m);
- seastar::future<> _handle_osd_map(Ref<MOSDMap> m);
- seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
- Ref<MOSDPGCreate2> m);
- seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
- Ref<MOSDOp> m);
- seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOp> m);
- seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOpReply> m);
- seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
- Ref<MOSDPeeringOp> m);
- seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
- Ref<MOSDFastDispatchOp> m);
- seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
- Ref<MOSDScrub2> m);
- seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
- Ref<MOSDMarkMeDown> m);
-
- seastar::future<> committed_osd_maps(version_t first,
- version_t last,
- Ref<MOSDMap> m);
-
- seastar::future<> check_osdmap_features();
-
- seastar::future<> handle_command(crimson::net::ConnectionRef conn,
- Ref<MCommand> m);
- seastar::future<> start_asok_admin();
- seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
- Ref<MOSDPGUpdateLogMissing> m);
- seastar::future<> handle_update_log_missing_reply(
- crimson::net::ConnectionRef conn,
- Ref<MOSDPGUpdateLogMissingReply> m);
-public:
- auto &get_pg_shard_manager() {
- return pg_shard_manager;
- }
- ShardServices &get_shard_services() {
- return pg_shard_manager.get_shard_services();
- }
-
private:
crimson::common::Gated gate;
return out;
}
+inline std::ostream& operator<<(std::ostream& out,
+ const OSD::ShardDispatcher& shard_dispatcher) {
+ shard_dispatcher.print(out);
+ return out;
+}
+
}
#if FMT_VERSION >= 90000
template <> struct fmt::formatter<crimson::osd::OSD> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::osd::OSD::ShardDispatcher> : fmt::ostream_formatter {};
#endif