namespace crimson::osd {
-seastar::future<core_id_t> PGShardMapping::get_or_create_pg_mapping(
+seastar::future<std::pair<core_id_t, unsigned int>> PGShardMapping::get_or_create_pg_mapping(
spg_t pgid,
- core_id_t core_expected)
+ core_id_t core_expected,
+ unsigned int store_shard_index)
{
LOG_PREFIX(PGShardMapping::get_or_create_pg_mapping);
auto find_iter = pg_to_core.find(pgid);
if (find_iter != pg_to_core.end()) {
- auto core_found = find_iter->second;
+ auto core_found = find_iter->second.first;
assert(core_found != NULL_CORE);
if (core_expected != NULL_CORE && core_expected != core_found) {
ERROR("the mapping is inconsistent for pg {}: core {}, expected {}",
pgid, core_found, core_expected);
ceph_abort_msg("The pg mapping is inconsistent!");
}
- return seastar::make_ready_future<core_id_t>(core_found);
+ return seastar::make_ready_future<std::pair<core_id_t, unsigned int>>(find_iter->second);
} else {
DEBUG("calling primary to add mapping for pg {} to the expected core {}",
pgid, core_expected);
return container().invoke_on(
- 0, [pgid, core_expected, FNAME](auto &primary_mapping) {
+ 0, [pgid, core_expected, store_shard_index, FNAME, this](auto &primary_mapping) {
auto core_to_update = core_expected;
+ auto shard_index_update = store_shard_index;
auto find_iter = primary_mapping.pg_to_core.find(pgid);
if (find_iter != primary_mapping.pg_to_core.end()) {
// this pgid was already mapped within primary_mapping, assert that the
// mapping is consistent and avoid emplacing once again.
- auto core_found = find_iter->second;
- assert(core_found != NULL_CORE);
+ auto core_found = find_iter->second.first;
+ auto store_index_found = find_iter->second.second;
if (core_expected != NULL_CORE) {
- if (core_expected != core_found) {
+ assert(store_shard_index != NULL_STORE_INDEX);
+ if (core_expected != core_found || store_shard_index != store_index_found) {
ERROR("the mapping is inconsistent for pg {} (primary): core {}, expected {}",
pgid, core_found, core_expected);
ceph_abort_msg("The pg mapping is inconsistent!");
}
// core_expected == core_found
- DEBUG("mapping pg {} to core {} (primary): already mapped and expected",
- pgid, core_to_update);
+ DEBUG("mapping pg {} to core {} (primary): already mapped and expected shard_index {}",
+ pgid, core_to_update, shard_index_update);
} else { // core_expected == NULL_CORE
+ assert(store_shard_index == NULL_STORE_INDEX);
core_to_update = core_found;
+ shard_index_update = store_index_found;
DEBUG("mapping pg {} to core {} (primary): already mapped",
pgid, core_to_update);
}
// add the mapping and ajust core_to_num_pgs
ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0);
std::map<core_id_t, unsigned>::iterator count_iter;
+ std::map<core_id_t, std::map<unsigned, unsigned>>::iterator core_shard_iter;
+ std::map<unsigned, unsigned>::iterator shard_iter;
if (core_expected == NULL_CORE) {
+ assert(store_shard_index == NULL_STORE_INDEX);
count_iter = std::min_element(
primary_mapping.core_to_num_pgs.begin(),
primary_mapping.core_to_num_pgs.end(),
}
ceph_assert_always(primary_mapping.core_to_num_pgs.end() != count_iter);
++(count_iter->second);
+
+ if (seastar::smp::count > store_shard_nums ) {
+ auto alien_iter = primary_mapping.core_alien_to_num_pgs.find(core_to_update);
+ auto core_iter = std::min_element(
+ alien_iter->second.begin(),
+ alien_iter->second.end(),
+ [](const auto &left, const auto &right) {
+ return left.second < right.second;
+ }
+ );
+ core_iter->second++;
+ core_to_update = core_iter->first;
+ }
+ if (seastar::smp::count >= store_shard_nums) {
+ shard_index_update = 0; // use the first store shard index on this core
+ } else {
+ core_shard_iter = primary_mapping.core_shard_to_num_pgs.find(core_to_update);
+ ceph_assert_always(core_shard_iter != primary_mapping.core_shard_to_num_pgs.end());
+ if (shard_index_update == NULL_STORE_INDEX) {
+ // find the store shard index with the least number of pgs
+ // on this core
+ shard_iter = std::min_element(
+ core_shard_iter->second.begin(),
+ core_shard_iter->second.end(),
+ [](const auto &left, const auto &right) {
+ return left.second < right.second;
+ }
+ );
+ shard_index_update = shard_iter->first; //find the store shard index on this core
+ } else {
+ shard_iter = core_shard_iter->second.find(shard_index_update);
+ }
+ ++(shard_iter->second);
+ }
[[maybe_unused]] auto [insert_iter, inserted] =
- primary_mapping.pg_to_core.emplace(pgid, core_to_update);
+ primary_mapping.pg_to_core.emplace(pgid, std::make_pair(core_to_update, shard_index_update));
assert(inserted);
- DEBUG("mapping pg {} to core {} (primary): num_pgs {}",
- pgid, core_to_update, count_iter->second);
+ DEBUG("mapping pg {} to core {} (primary): num_pgs {}, store_shard_index {}",
+ pgid, core_to_update, count_iter->second, shard_index_update);
}
assert(core_to_update != NULL_CORE);
return primary_mapping.container().invoke_on_others(
- [pgid, core_to_update, FNAME](auto &other_mapping) {
+ [pgid, core_to_update, shard_index_update, FNAME](auto &other_mapping) {
auto find_iter = other_mapping.pg_to_core.find(pgid);
if (find_iter == other_mapping.pg_to_core.end()) {
- DEBUG("mapping pg {} to core {} (others)",
- pgid, core_to_update);
+ DEBUG("mapping pg {} to core {} (others), store_shard_index {}",
+ pgid, core_to_update, shard_index_update);
[[maybe_unused]] auto [insert_iter, inserted] =
- other_mapping.pg_to_core.emplace(pgid, core_to_update);
+ other_mapping.pg_to_core.emplace(pgid, std::make_pair(core_to_update, shard_index_update));
assert(inserted);
} else {
- auto core_found = find_iter->second;
- if (core_found != core_to_update) {
- ERROR("the mapping is inconsistent for pg {} (others): core {}, expected {}",
- pgid, core_found, core_to_update);
+ auto core_found = find_iter->second.first;
+ auto store_index_found = find_iter->second.second;
+ if (core_found != core_to_update ||store_index_found!= shard_index_update) {
+ ERROR("the mapping is inconsistent for pg {} (others): core {}, expected {}, store_shard_index {}",
+ pgid, core_found, core_to_update, shard_index_update);
ceph_abort_msg("The pg mapping is inconsistent!");
}
DEBUG("mapping pg {} to core {} (others): already mapped",
pgid, core_to_update);
}
});
- }).then([this, pgid, core_expected, FNAME] {
+ }).then([this, pgid, core_expected, store_shard_index, FNAME] {
auto find_iter = pg_to_core.find(pgid);
if (find_iter == pg_to_core.end()) {
ERROR("the mapping is inconsistent for pg {}: core not found, expected {}",
pgid, core_expected);
ceph_abort_msg("The pg mapping is inconsistent!");
}
- auto core_found = find_iter->second;
- if (core_expected != NULL_CORE && core_found != core_expected) {
- ERROR("the mapping is inconsistent for pg {}: core {}, expected {}",
+ auto core_found = find_iter->second.first;
+ auto shard_index_found = find_iter->second.second;
+ if (seastar::smp::count <= store_shard_nums) {
+ if ((core_expected != NULL_CORE && core_found != core_expected) ||
+ (store_shard_index != NULL_STORE_INDEX && shard_index_found != store_shard_index)) {
+ ERROR("the mapping is inconsistent for pg {}: core {}, expected {}",
pgid, core_found, core_expected);
- ceph_abort_msg("The pg mapping is inconsistent!");
+ ceph_abort_msg("The pg mapping is inconsistent!");
+ }
}
DEBUG("returning pg {} mapping to core {} after broadcasted",
pgid, core_found);
- return seastar::make_ready_future<core_id_t>(core_found);
+ return seastar::make_ready_future<std::pair<core_id_t, unsigned int>>(find_iter->second);
});
}
}
ERROR("trying to remove non-exist mapping for pg {} (primary)", pgid);
ceph_abort_msg("The pg mapping is inconsistent!");
}
- assert(find_iter->second != NULL_CORE);
- auto count_iter = primary_mapping.core_to_num_pgs.find(find_iter->second);
+ assert(find_iter->second.first != NULL_CORE);
+ auto count_iter = primary_mapping.core_to_num_pgs.find(find_iter->second.first);
assert(count_iter != primary_mapping.core_to_num_pgs.end());
assert(count_iter->second > 0);
--(count_iter->second);
+
+ auto core_shard_iter = primary_mapping.core_shard_to_num_pgs.find(find_iter->second.first);
+ auto shard_iter = core_shard_iter->second.find(find_iter->second.second);
+ assert(shard_iter != core_shard_iter->second.end());
+ if (seastar::smp::count < primary_mapping.store_shard_nums) {
+ assert(shard_iter->second > 0);
+ --(shard_iter->second);
+ }
+
primary_mapping.pg_to_core.erase(find_iter);
DEBUG("pg {} mapping erased (primary)", pgid);
return primary_mapping.container().invoke_on_others(
ERROR("trying to remove non-exist mapping for pg {} (others)", pgid);
ceph_abort_msg("The pg mapping is inconsistent!");
}
- assert(find_iter->second != NULL_CORE);
+ assert(find_iter->second.first != NULL_CORE);
other_mapping.pg_to_core.erase(find_iter);
DEBUG("pg {} mapping erased (others)", pgid);
});
/// Returns mapping if present, NULL_CORE otherwise
core_id_t get_pg_mapping(spg_t pgid) {
auto iter = pg_to_core.find(pgid);
- ceph_assert_always(iter == pg_to_core.end() || iter->second != NULL_CORE);
- return iter == pg_to_core.end() ? NULL_CORE : iter->second;
+ ceph_assert_always(iter == pg_to_core.end() || iter->second.first != NULL_CORE);
+ return iter == pg_to_core.end() ? NULL_CORE : iter->second.first;
}
/// Returns mapping for pgid, creates new one if it doesn't already exist
- seastar::future<core_id_t> get_or_create_pg_mapping(
+ seastar::future<std::pair<core_id_t, unsigned int>> get_or_create_pg_mapping(
spg_t pgid,
- core_id_t core_expected = NULL_CORE);
+ core_id_t core_expected = NULL_CORE,
+ unsigned int store_shard_index = NULL_STORE_INDEX);
/// Remove pgid mapping
seastar::future<> remove_pg_mapping(spg_t pgid);
size_t get_num_pgs() const { return pg_to_core.size(); }
/// Map to cores in [min_core_mapping, core_mapping_limit)
- PGShardMapping(core_id_t min_core_mapping, core_id_t core_mapping_limit) {
+ PGShardMapping(core_id_t min_core_mapping, core_id_t core_mapping_limit, unsigned int store_shard_nums)
+ : store_shard_nums(store_shard_nums) {
ceph_assert_always(min_core_mapping < core_mapping_limit);
- for (auto i = min_core_mapping; i != core_mapping_limit; ++i) {
+ auto max_core_mapping = std::min(min_core_mapping + store_shard_nums, core_mapping_limit);
+ auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+ auto num_alien_cores = (seastar::smp::count + store_shard_nums -1 ) / store_shard_nums;
+
+ for (auto i = min_core_mapping; i != max_core_mapping; ++i) {
+ for (unsigned int j = 0; j < num_shard_services; ++j) {
+ if (i - min_core_mapping + j * seastar::smp::count < store_shard_nums) {
+ core_shard_to_num_pgs[i].emplace(j, 0);
+ }
+ }
core_to_num_pgs.emplace(i, 0);
+ for (unsigned int j = 0; j < num_alien_cores; ++j) {
+ if (store_shard_nums * j + i < core_mapping_limit) {
+ core_alien_to_num_pgs[i].emplace(store_shard_nums * j + i, 0);
+ }
+ }
}
}
}
private:
+
+ unsigned int store_shard_nums;
// only in shard 0
+ //<core_id, num_pgs>
std::map<core_id_t, unsigned> core_to_num_pgs;
+ //<core_id, <shard_index, num_pgs>> // when smp < store_shard_nums, each core more than one store shard
+ std::map<core_id_t, std::map<unsigned, unsigned>> core_shard_to_num_pgs;
+ //<core_id, <alien_core_id, num_pgs>> // when smp > store_shard_nums, more than one core share store shard
+ std::map<core_id_t, std::map<core_id_t, unsigned>> core_alien_to_num_pgs;
// per-shard, updated by shard 0
- std::map<spg_t, core_id_t> pg_to_core;
+ //<pg, <core_id, store_shard_index>>
+ std::map<spg_t, std::pair<core_id_t, unsigned int>> pg_to_core;
+
};
/**