startup_time = ceph::mono_clock::now();
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return store.start().then([this] {
- return osd_singleton_state.start_single(
- whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
- std::ref(*monc), std::ref(*mgrc)
+ return pg_to_shard_mappings.start(0, seastar::smp::count
).then([this] {
+ return osd_singleton_state.start_single(
+ whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
+ std::ref(*monc), std::ref(*mgrc));
+ }).then([this] {
ceph::mono_time startup_time = ceph::mono_clock::now();
return shard_services.start(
std::ref(osd_singleton_state),
+ std::ref(pg_to_shard_mappings),
whoami,
startup_time,
osd_singleton_state.local().perf,
return shard_dispatchers.start(
std::ref(*this),
whoami,
- std::ref(store));
+ std::ref(store),
+ std::ref(pg_to_shard_mappings));
});
}).then([this] {
heartbeat.reset(new Heartbeat{
return shard_services.stop();
}).then([this] {
return osd_singleton_state.stop();
+ }).then([this] {
+ return pg_to_shard_mappings.stop();
}).then([fut=std::move(gate_close_fut)]() mutable {
return std::move(fut);
}).then([this] {
ShardDispatcher(
OSD& osd,
int whoami,
- crimson::os::FuturizedStore& store)
- : pg_shard_manager(osd.osd_singleton_state, osd.shard_services),
+ crimson::os::FuturizedStore& store,
+ PGShardMapping& pg_to_shard_mapping)
+ : pg_shard_manager(osd.osd_singleton_state,
+ osd.shard_services, pg_to_shard_mapping),
osd(osd),
whoami(whoami),
store(store) {}
void handle_authentication(const EntityName& name,
const AuthCapsInfo& caps) final;
+ seastar::sharded<PGShardMapping> pg_to_shard_mappings;
seastar::sharded<OSDSingletonState> osd_singleton_state;
seastar::sharded<ShardServices> shard_services;
seastar::sharded<ShardDispatcher> shard_dispatchers;
/**
* PGShardMapping
*
- * Maps pgs to shards.
+ * Maintains a mapping from spg_t to the core containing that PG. Internally, each
+ * core has a local copy of the mapping to enable core-local lookups. Updates
+ * are proxied to core 0, and the back out to all other cores -- see maybe_create_pg.
*/
-class PGShardMapping {
+class PGShardMapping : public seastar::peering_sharded_service<PGShardMapping> {
public:
/// Returns mapping if present, NULL_CORE otherwise
core_id_t get_pg_mapping(spg_t pgid) {
}
/// Returns mapping for pgid, creates new one if it doesn't already exist
- core_id_t maybe_create_pg(spg_t pgid, core_id_t core = NULL_CORE) {
- auto [insert_iter, inserted] = pg_to_core.emplace(pgid, core);
- if (!inserted) {
- ceph_assert_always(insert_iter->second != NULL_CORE);
+ seastar::future<core_id_t> maybe_create_pg(
+ spg_t pgid,
+ core_id_t core = NULL_CORE) {
+ auto find_iter = pg_to_core.find(pgid);
+ if (find_iter != pg_to_core.end()) {
+ ceph_assert_always(find_iter->second != NULL_CORE);
if (core != NULL_CORE) {
- ceph_assert_always(insert_iter->second == core);
+ ceph_assert_always(find_iter->second == core);
}
- return insert_iter->second;
+ return seastar::make_ready_future<core_id_t>(find_iter->second);
} else {
- ceph_assert_always(core_to_num_pgs.size() > 0);
- std::map<core_id_t, unsigned>::iterator core_iter;
- if (core == NULL_CORE) {
- core_iter = std::min_element(
- core_to_num_pgs.begin(),
- core_to_num_pgs.end(),
- [](const auto &left, const auto &right) {
- return left.second < right.second;
+ return container().invoke_on(0,[pgid, core]
+ (auto &primary_mapping) {
+ auto [insert_iter, inserted] = primary_mapping.pg_to_core.emplace(pgid, core);
+ ceph_assert_always(inserted);
+ ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0);
+ std::map<core_id_t, unsigned>::iterator core_iter;
+ if (core == NULL_CORE) {
+ core_iter = std::min_element(
+ primary_mapping.core_to_num_pgs.begin(),
+ primary_mapping.core_to_num_pgs.end(),
+ [](const auto &left, const auto &right) {
+ return left.second < right.second;
+ });
+ } else {
+ core_iter = primary_mapping.core_to_num_pgs.find(core);
+ }
+ ceph_assert_always(primary_mapping.core_to_num_pgs.end() != core_iter);
+ insert_iter->second = core_iter->first;
+ core_iter->second++;
+ return primary_mapping.container().invoke_on_others(
+ [pgid = insert_iter->first, core = insert_iter->second]
+ (auto &other_mapping) {
+ ceph_assert_always(core != NULL_CORE);
+ auto [insert_iter, inserted] = other_mapping.pg_to_core.emplace(pgid, core);
+ ceph_assert_always(inserted);
});
- } else {
- core_iter = core_to_num_pgs.find(core);
- }
- ceph_assert_always(core_to_num_pgs.end() != core_iter);
- insert_iter->second = core_iter->first;
- core_iter->second++;
- return insert_iter->second;
+ }).then([this, pgid] {
+ auto find_iter = pg_to_core.find(pgid);
+ return seastar::make_ready_future<core_id_t>(find_iter->second);
+ });
}
}
/// Remove pgid
- void remove_pg(spg_t pgid) {
- auto iter = pg_to_core.find(pgid);
- ceph_assert_always(iter != pg_to_core.end());
- ceph_assert_always(iter->second != NULL_CORE);
- auto count_iter = core_to_num_pgs.find(iter->second);
- ceph_assert_always(count_iter != core_to_num_pgs.end());
- ceph_assert_always(count_iter->second > 0);
- --(count_iter->second);
- pg_to_core.erase(iter);
+ seastar::future<> remove_pg(spg_t pgid) {
+ return container().invoke_on(0, [pgid](auto &primary_mapping) {
+ auto iter = primary_mapping.pg_to_core.find(pgid);
+ ceph_assert_always(iter != primary_mapping.pg_to_core.end());
+ ceph_assert_always(iter->second != NULL_CORE);
+ auto count_iter = primary_mapping.core_to_num_pgs.find(iter->second);
+ ceph_assert_always(count_iter != primary_mapping.core_to_num_pgs.end());
+ ceph_assert_always(count_iter->second > 0);
+ --(count_iter->second);
+ primary_mapping.pg_to_core.erase(iter);
+ return primary_mapping.container().invoke_on_others(
+ [pgid](auto &other_mapping) {
+ auto iter = other_mapping.pg_to_core.find(pgid);
+ ceph_assert_always(iter != other_mapping.pg_to_core.end());
+ ceph_assert_always(iter->second != NULL_CORE);
+ other_mapping.pg_to_core.erase(iter);
+ });
+ });
}
size_t get_num_pgs() const { return pg_to_core.size(); }
auto[coll, shard_core] = coll_core;
spg_t pgid;
if (coll.is_pg(&pgid)) {
- auto core = get_osd_singleton_state(
- ).pg_to_shard_mapping.maybe_create_pg(
- pgid, shard_core);
- return with_remote_shard_state(
- core,
- [pgid](
+ return pg_to_shard_mapping.maybe_create_pg(
+ pgid, shard_core
+ ).then([this, pgid] (auto core) {
+ return this->template with_remote_shard_state(
+ core,
+ [pgid](
PerShardState &per_shard_state,
ShardServices &shard_services) {
return shard_services.load_pg(
return seastar::now();
});
});
+ });
} else if (coll.is_temp(&pgid)) {
logger().warn(
"found temp collection on crimson osd, should be impossible: {}",
class PGShardManager {
seastar::sharded<OSDSingletonState> &osd_singleton_state;
seastar::sharded<ShardServices> &shard_services;
+ PGShardMapping &pg_to_shard_mapping;
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
template <typename... Args> \
PGShardManager(
seastar::sharded<OSDSingletonState> &osd_singleton_state,
- seastar::sharded<ShardServices> &shard_services)
+ seastar::sharded<ShardServices> &shard_services,
+ PGShardMapping &pg_to_shard_mapping)
: osd_singleton_state(osd_singleton_state),
- shard_services(shard_services) {}
+ shard_services(shard_services),
+ pg_to_shard_mapping(pg_to_shard_mapping) {}
auto &get_osd_singleton_state() {
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
static_assert(T::can_create());
logger.debug("{}: can_create", *op);
- auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
- op->get_pgid());
-
get_local_state().registry.remove_from_registry(*op);
- return with_remote_shard_state_and_op<T>(
- core, std::move(op),
- [](PerShardState &per_shard_state,
- ShardServices &shard_services,
- typename T::IRef op) {
+ return pg_to_shard_mapping.maybe_create_pg(
+ op->get_pgid()
+ ).then([this, op = std::move(op)](auto core) mutable {
+ return this->template with_remote_shard_state_and_op<T>(
+ core, std::move(op),
+ [](PerShardState &per_shard_state,
+ ShardServices &shard_services,
+ typename T::IRef op) {
per_shard_state.registry.add_to_registry(*op);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto &opref = *op;
})
).then([op=std::move(op)] {});
});
+ });
}
/// Runs opref on the appropriate core, waiting for pg as necessary
static_assert(!T::can_create());
logger.debug("{}: !can_create", *op);
- auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
- op->get_pgid());
-
get_local_state().registry.remove_from_registry(*op);
- return with_remote_shard_state_and_op<T>(
- core, std::move(op),
- [](PerShardState &per_shard_state,
- ShardServices &shard_services,
- typename T::IRef op) {
+ return pg_to_shard_mapping.maybe_create_pg(
+ op->get_pgid()
+ ).then([this, op = std::move(op)](auto core) mutable {
+ return this->template with_remote_shard_state_and_op<T>(
+ core, std::move(op),
+ [](PerShardState &per_shard_state,
+ ShardServices &shard_services,
+ typename T::IRef op) {
per_shard_state.registry.add_to_registry(*op);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto &opref = *op;
})
).then([op=std::move(op)] {});
});
+ });
}
seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
*/
template <typename F>
void for_each_pgid(F &&f) const {
- return get_osd_singleton_state().pg_to_shard_mapping.for_each_pgid(
+ return pg_to_shard_mapping.for_each_pgid(
std::forward<F>(f));
}
auto get_num_pgs() const {
- return get_osd_singleton_state().pg_to_shard_mapping.get_num_pgs();
+ return pg_to_shard_mapping.get_num_pgs();
}
seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
template <typename F>
auto with_pg(spg_t pgid, F &&f) {
- core_id_t core = get_osd_singleton_state(
- ).pg_to_shard_mapping.get_pg_mapping(pgid);
+ core_id_t core = pg_to_shard_mapping.get_pg_mapping(pgid);
return with_remote_shard_state(
core,
[pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
void requeue_pg_temp();
seastar::future<> send_pg_temp();
- // TODO: add config to control mapping
- PGShardMapping pg_to_shard_mapping{0, seastar::smp::count};
-
std::set<pg_t> pg_created;
seastar::future<> send_pg_created(pg_t pgid);
seastar::future<> send_pg_created();
PerShardState local_state;
seastar::sharded<OSDSingletonState> &osd_singleton_state;
+ PGShardMapping& pg_to_shard_mapping;
template <typename F, typename... Args>
auto with_singleton(F &&f, Args&&... args) {
template <typename... PSSArgs>
ShardServices(
seastar::sharded<OSDSingletonState> &osd_singleton_state,
+ PGShardMapping& pg_to_shard_mapping,
PSSArgs&&... args)
: local_state(std::forward<PSSArgs>(args)...),
- osd_singleton_state(osd_singleton_state) {}
+ osd_singleton_state(osd_singleton_state),
+ pg_to_shard_mapping(pg_to_shard_mapping) {}
FORWARD_TO_OSD_SINGLETON(send_to_osd)
auto remove_pg(spg_t pgid) {
local_state.pg_map.remove_pg(pgid);
- return with_singleton(
- [pgid](auto &osstate) {
- osstate.pg_to_shard_mapping.remove_pg(pgid);
- });
+ return pg_to_shard_mapping.remove_pg(pgid);
}
crimson::common::CephContext *get_cct() {