seastar::future<> PerShardState::stop_pgs()
{
+ assert_core();
return seastar::parallel_for_each(
pg_map.get_pgs(),
[](auto& p) {
std::map<pg_t, pg_stat_t> PerShardState::get_pg_stats() const
{
+ assert_core();
std::map<pg_t, pg_stat_t> ret;
for (auto [pgid, pg] : pg_map.get_pgs()) {
if (pg->is_primary()) {
ShardServices &shard_services,
epoch_t epoch)
{
+ assert_core();
auto &pgs = pg_map.get_pgs();
return seastar::parallel_for_each(
pgs.begin(), pgs.end(),
Ref<PG> PerShardState::get_pg(spg_t pgid)
{
+ assert_core();
return pg_map.get_pg(pgid);
}
HeartbeatStampsRef PerShardState::get_hb_stamps(int peer)
{
+ assert_core();
auto [stamps, added] = heartbeat_stamps.try_emplace(peer);
if (added) {
stamps->second = ceph::make_ref<HeartbeatStamps>(peer);
using cached_map_t = OSDMapService::cached_map_t;
using local_cached_map_t = OSDMapService::local_cached_map_t;
+ const core_id_t core = seastar::this_shard_id();
+#define assert_core() ceph_assert(seastar::this_shard_id() == core);
+
const int whoami;
crimson::os::FuturizedStore &store;
crimson::common::CephContext cct;
epoch_t up_epoch = 0;
OSDMapService::cached_map_t osdmap;
- const auto &get_osdmap() const { return osdmap; }
+ const auto &get_osdmap() const {
+ assert_core();
+ return osdmap;
+ }
void update_map(OSDMapService::cached_map_t new_osdmap) {
+ assert_core();
osdmap = std::move(new_osdmap);
}
- void set_up_epoch(epoch_t epoch) { up_epoch = epoch; }
+ void set_up_epoch(epoch_t epoch) {
+ assert_core();
+ up_epoch = epoch;
+ }
crimson::osd::ObjectContextRegistry obc_registry;
// case the shutdown may never succeed.
bool stopping = false;
seastar::future<> stop_registry() {
+ assert_core();
crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__);
stopping = true;
return registry.stop();
Ref<PG> get_pg(spg_t pgid);
template <typename F>
void for_each_pg(F &&f) const {
+ assert_core();
for (auto &pg : pg_map.get_pgs()) {
std::invoke(f, pg.first, pg.second);
}
template <typename T, typename... Args>
auto start_operation(Args&&... args) {
+ assert_core();
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
auto op = registry.create_operation<T>(std::forward<Args>(args)...);
+ crimson::get_logger(ceph_subsys_osd).info(
+ "PerShardState::{}, {}", __func__, *op);
auto fut = op->start().then([op /* by copy */] {
// ensure the op's lifetime is appropriate. It is not enough to
// guarantee it's alive at the scheduling stages (i.e. `then()`
// tids for ops i issue, prefixed with core id to ensure uniqueness
ceph_tid_t next_tid;
ceph_tid_t get_tid() {
+ assert_core();
return next_tid++;
}
// Time state
const ceph::mono_time startup_time;
ceph::signedspan get_mnow() const {
+ assert_core();
return ceph::mono_clock::now() - startup_time;
}