seastar::future<> OSD::open_meta_coll()
{
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- return store.get_sharded_store().open_collection(
+ return store.get_sharded_store()->open_collection(
coll_t::meta()
).then([this](auto ch) {
pg_shard_manager.init_meta_coll(ch, store.get_sharded_store());
seastar::future<OSDMeta> OSD::open_or_create_meta_coll(FuturizedStore &store)
{
- return store.get_sharded_store().open_collection(coll_t::meta()).then([&store](auto ch) {
+ return store.get_sharded_store()->open_collection(coll_t::meta()).then([&store](auto ch) {
if (!ch) {
- return store.get_sharded_store().create_new_collection(
+ return store.get_sharded_store()->create_new_collection(
coll_t::meta()
).then([&store](auto ch) {
return OSDMeta(ch, store.get_sharded_store());
meta_coll.create(t);
meta_coll.store_superblock(t, superblock);
DEBUG("OSD::_write_superblock: do_transaction...");
- return store.get_sharded_store().do_transaction(
+ return store.get_sharded_store()->do_transaction(
meta_coll.collection(),
std::move(t));
}),
}
startup_time = ceph::mono_clock::now();
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- return store.start().then([this] {
- return pg_to_shard_mappings.start(0, seastar::smp::count
+ return store.start().then([this] (auto store_shard_nums) {
+ return pg_to_shard_mappings.start(0, seastar::smp::count, store_shard_nums
).then([this] {
return osd_singleton_state.start_single(
whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
std::ref(*monc), std::ref(*mgrc));
}).then([this] {
return osd_states.start();
- }).then([this] {
+ }).then([this, store_shard_nums] {
ceph::mono_time startup_time = ceph::mono_clock::now();
return shard_services.start(
std::ref(osd_singleton_state),
std::ref(pg_to_shard_mappings),
+ store_shard_nums,
whoami,
startup_time,
osd_singleton_state.local().perf,
osd_singleton_state.local().recoverystate_perf,
std::ref(store),
std::ref(osd_states));
+ }).then([this, FNAME] {
+ return shard_services.invoke_on_all(
+ [this](auto& local_service) {
+ local_service.set_container(shard_services);
+ });
+ }).then([this, FNAME] {
+ return shard_services.invoke_on_all(
+ [this](auto& local_service) {
+ return local_service.get_remote_store();
+ });
});
}).then([this, FNAME] {
heartbeat.reset(new Heartbeat{
co_await pg_shard_manager.set_superblock(superblock);
DEBUG("submitting transaction");
- co_await store.get_sharded_store().do_transaction(
+ co_await store.get_sharded_store()->do_transaction(
pg_shard_manager.get_meta_coll().collection(), std::move(t));
// TODO: write to superblock and commit the transaction
std::vector<seastar::future<>> futures;
std::vector<seastar::future<>> cleanup_futures;
- auto collection_future = store.get_sharded_store().open_collection(
+ auto collection_future = store.get_sharded_store()->open_collection(
coll_t::meta());
auto collection_ref = co_await std::move(collection_future);
ceph::os::Transaction cleanup_t;
ghobject_t::NO_GEN,
shard_id_t::NO_SHARD);
t.write(coll_t::meta(), oid, 0, data.size(), bl);
- futures.push_back(store.get_sharded_store().do_transaction(
+ futures.push_back(store.get_sharded_store()->do_transaction(
collection_ref, std::move(t)));
cleanup_t.remove(coll_t::meta(), oid);
- cleanup_futures.push_back(store.get_sharded_store().do_transaction(
+ cleanup_futures.push_back(store.get_sharded_store()->do_transaction(
collection_ref, std::move(cleanup_t)));
}
}
t.write(coll_t::meta(), oid, offset, bsize, bl);
- futures_bench.push_back(store.get_sharded_store().do_transaction(
+ futures_bench.push_back(store.get_sharded_store()->do_transaction(
collection_ref, std::move(t)));
if (!onum || !osize) {
cleanup_t.remove(coll_t::meta(), oid);
- cleanup_futures.push_back(store.get_sharded_store().do_transaction(
+ cleanup_futures.push_back(store.get_sharded_store()->do_transaction(
collection_ref, std::move(cleanup_t)));
}
}
children_pgids.insert(child_pgid);
// Map each child pg ID to a core
- auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id());
+ auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id(), 0);
DEBUG(" PG {} mapped to {}", child_pgid.pgid, core);
DEBUG(" {} map epoch: {}", child_pgid.pgid, pg_epoch);
auto map = next_map;
- auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, true);
+ auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, 0, true);
DEBUG(" Parent pgid: {}", pg->get_pgid());
DEBUG(" Child pgid: {}", child_pg->get_pgid());
return seastar::parallel_for_each(
colls_cores,
[this](auto coll_core) {
- auto[coll, shard_core] = coll_core;
+ auto[coll, shard_core_index] = coll_core;
+ auto[shard_core, store_index] = shard_core_index;
spg_t pgid;
if (coll.is_pg(&pgid)) {
return get_pg_to_shard_mapping().get_or_create_pg_mapping(
- pgid, shard_core
- ).then([this, pgid] (auto core) {
+ pgid, shard_core, store_index
+ ).then([this, pgid] (auto core_store) {
return this->with_remote_shard_state(
- core,
- [pgid](
+ core_store.first,
+ [pgid, core_store](
PerShardState &per_shard_state,
ShardServices &shard_services) {
return shard_services.load_pg(
- pgid
+ pgid, core_store.second
).then([pgid, &per_shard_state](auto &&pg) {
logger().info("load_pgs: loaded {}", pgid);
return pg->clear_temp_objects(
template <typename T>
seastar::future<> run_with_pg_maybe_create(
typename T::IRef op,
- ShardServices &target_shard_services
+ ShardServices &target_shard_services,
+ unsigned int store_index
) {
static_assert(T::can_create());
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto &opref = *op;
return opref.template with_blocking_event<
PGMap::PGCreationBlockingEvent
- >([&target_shard_services, &opref](auto &&trigger) {
+ >([&target_shard_services, &opref, store_index](auto &&trigger) {
return target_shard_services.get_or_create_pg(
std::move(trigger),
opref.get_pgid(),
+ store_index,
opref.get_create_info()
);
}).safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
return seastar::make_exception_future<>(fut.get_exception());
}
- auto core = fut.get();
+ auto core_store = fut.get();
logger.debug("{}: can_create={}, target-core={}",
- *op, T::can_create(), core);
+ *op, T::can_create(), core_store.first);
return this->template with_remote_shard_state_and_op<T>(
- core, std::move(op),
- [this](ShardServices &target_shard_services,
+ core_store.first, std::move(op),
+ [this, core_store](ShardServices &target_shard_services,
typename T::IRef op) {
auto &opref = *op;
auto &logger = crimson::get_logger(ceph_subsys_osd);
logger.debug("{}: entering create_or_wait_pg", opref);
return opref.template enter_stage<>(
opref.get_pershard_pipeline(target_shard_services).create_or_wait_pg
- ).then([this, &target_shard_services, op=std::move(op)]() mutable {
+ ).then([this, &target_shard_services, op=std::move(op), core_store]() mutable {
if constexpr (T::can_create()) {
return this->template run_with_pg_maybe_create<T>(
- std::move(op), target_shard_services);
+ std::move(op), target_shard_services, core_store.second);
} else {
return this->template run_with_pg_maybe_wait<T>(
std::move(op), target_shard_services);
opref, opref.get_pgid());
return seastar::now();
}
+ SUBDEBUG(osd, "{}: have_pg", opref);
return op->with_pg(
target_shard_services, pg
).finally([op] {});
crimson::os::FuturizedStore &store,
OSDState &osd_state)
: whoami(whoami),
- store(store.get_sharded_store()),
+ stores(store.get_sharded_stores()),
osd_state(osd_state),
osdmap_gate("PerShardState::osdmap_gate"),
perf(perf), recoverystate_perf(recoverystate_perf),
seastar::future<Ref<PG>> ShardServices::make_pg(
OSDMapService::cached_map_t create_map,
spg_t pgid,
+ unsigned store_index,
bool do_create)
{
using ec_profile_t = std::map<std::string, std::string>;
return get_pool_info(pgid.pool());
}
};
- auto get_collection = [pgid, do_create, this] {
+ auto get_collection = [pgid, do_create, store_index, this] {
const coll_t cid{pgid};
if (do_create) {
- return get_store().create_new_collection(cid);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::create_new_collection>(
+ get_store(store_index), cid);
} else {
- return get_store().open_collection(cid);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>(
+ get_store(store_index), cid);
}
};
return seastar::when_all(
std::move(get_pool_info_for_pg),
std::move(get_collection)
- ).then([pgid, create_map, this](auto &&ret) {
+ ).then([pgid, create_map, store_index, this](auto &&ret) {
auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get());
auto coll = std::move(std::get<1>(ret).get());
return seastar::make_ready_future<Ref<PG>>(
new PG{
- pgid,
- pg_shard_t{local_state.whoami, pgid.shard},
- std::move(coll),
- std::move(pool),
- std::move(name),
- create_map,
- *this,
- ec_profile});
+ pgid,
+ pg_shard_t{local_state.whoami, pgid.shard},
+ std::move(store_index),
+ std::move(coll),
+ std::move(pool),
+ std::move(name),
+ create_map,
+ *this,
+ ec_profile});
});
}
seastar::future<Ref<PG>> ShardServices::handle_pg_create_info(
+ unsigned int store_index,
std::unique_ptr<PGCreateInfo> info) {
return seastar::do_with(
std::move(info),
- [this](auto &info)
+ [store_index, this](auto &info)
-> seastar::future<Ref<PG>> {
return get_map(info->epoch).then(
- [&info, this](cached_map_t startmap)
+ [&info, store_index, this](cached_map_t startmap)
-> seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
LOG_PREFIX(ShardServices::handle_pg_create_info);
const spg_t &pgid = info->pgid;
}
}
return make_pg(
- startmap, pgid, true
+ startmap, pgid, store_index, true
).then([startmap=std::move(startmap)](auto pg) mutable {
return seastar::make_ready_future<
std::tuple<Ref<PG>, OSDMapService::cached_map_t>
ShardServices::get_or_create_pg(
PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
spg_t pgid,
+ unsigned int store_index,
std::unique_ptr<PGCreateInfo> info)
{
if (info) {
if (!existed) {
local_state.pg_map.set_creating(pgid);
(void)handle_pg_create_info(
- std::move(info));
+ store_index,
+ std::move(info));
}
return std::move(fut);
} else {
return std::move(fut);
}
-seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid)
-
+seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid, unsigned int store_index)
{
LOG_PREFIX(OSDSingletonState::load_pg);
DEBUG("{}", pgid);
- return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) {
+ return seastar::do_with(PGMeta(get_store(store_index), pgid), [](auto& pg_meta) {
return pg_meta.get_epoch();
}).then([this](epoch_t e) {
return get_map(e);
- }).then([pgid, this](auto&& create_map) {
- return make_pg(std::move(create_map), pgid, false);
- }).then([this](Ref<PG> pg) {
- return pg->read_state(&get_store()).then([pg] {
+ }).then([pgid, store_index, this](auto&& create_map) {
+ return make_pg(std::move(create_map), pgid, store_index, false);
+ }).then([store_index, this](Ref<PG> pg) {
+ return pg->read_state(get_store(store_index)).then([pg] {
return seastar::make_ready_future<Ref<PG>>(std::move(pg));
});
}).handle_exception([FNAME, pgid](auto ep) {
}
seastar::future<> ShardServices::dispatch_context_transaction(
- crimson::os::CollectionRef col, PeeringCtx &ctx) {
+ crimson::os::CollectionRef col, PeeringCtx &ctx, unsigned int store_index) {
LOG_PREFIX(OSDSingletonState::dispatch_context_transaction);
if (ctx.transaction.empty()) {
DEBUG("empty transaction");
- co_await get_store().flush(col);
+ co_await crimson::os::with_store_do_transaction(
+ get_store(store_index),
+ col, ceph::os::Transaction{});
Context* on_commit(
ceph::os::Transaction::collect_all_contexts(ctx.transaction));
if (on_commit) {
}
DEBUG("do_transaction ...");
- co_await get_store().do_transaction(
+ co_await crimson::os::with_store_do_transaction(
+ get_store(store_index),
col,
ctx.transaction.claim_and_reset());
co_return;
}
seastar::future<> ShardServices::dispatch_context(
+ unsigned int store_index,
crimson::os::CollectionRef col,
PeeringCtx &&pctx)
{
return seastar::do_with(
std::move(pctx),
- [this, col](auto &ctx) {
+ [this, col, store_index](auto &ctx) {
ceph_assert(col || ctx.transaction.empty());
return seastar::when_all_succeed(
dispatch_context_messages(
BufferedRecoveryMessages{ctx}),
- col ? dispatch_context_transaction(col, ctx) : seastar::now()
+ col ? dispatch_context_transaction(col, ctx, store_index) : seastar::now()
).then_unpack([] {
return seastar::now();
});
#define assert_core() ceph_assert(seastar::this_shard_id() == core);
const int whoami;
- crimson::os::FuturizedStore::Shard &store;
+ std::vector<crimson::os::FuturizedStore::StoreShardRef> stores;
crimson::common::CephContext cct;
OSDState &osd_state;
PerShardState local_state;
seastar::sharded<OSDSingletonState> &osd_singleton_state;
PGShardMapping& pg_to_shard_mapping;
+ seastar::sharded<ShardServices>* s_container = nullptr;
+ unsigned int store_shard_nums = 0;
template <typename F, typename... Args>
auto with_singleton(F &&f, Args&&... args) {
ShardServices(
seastar::sharded<OSDSingletonState> &osd_singleton_state,
PGShardMapping& pg_to_shard_mapping,
+ unsigned int store_shard_nums,
PSSArgs&&... args)
: local_state(std::forward<PSSArgs>(args)...),
osd_singleton_state(osd_singleton_state),
- pg_to_shard_mapping(pg_to_shard_mapping) {}
+ pg_to_shard_mapping(pg_to_shard_mapping),
+ store_shard_nums(store_shard_nums) {}
FORWARD_TO_OSD_SINGLETON(send_to_osd)
- crimson::os::FuturizedStore::Shard &get_store() {
- return local_state.store;
+ void set_container(seastar::sharded<ShardServices>& ss) { s_container = &ss; }
+
+ seastar::future<> get_remote_store() {
+ if (local_state.stores.empty()) {
+ return s_container->invoke_on(
+ seastar::this_shard_id() % store_shard_nums,
+ [] (auto& remote_service) {
+ assert(remote_service.local_state.stores.size() == 1);
+ auto ret = remote_service.local_state.stores[0].get_foreign();
+ return std::move(ret);
+ }).then([this](auto&& remote_store) {
+ local_state.stores.emplace_back(make_local_shared_foreign(std::move(remote_store)));
+ return seastar::now();
+ });
+ } else {
+ return seastar::now();
+ }
+ }
+
+ crimson::os::FuturizedStore::StoreShardRef get_store(unsigned int store_index) {
+ assert(store_index < local_state.stores.size());
+ return local_state.stores[store_index];
}
struct shard_stats_t {
return {get_reactor_utilization()};
}
- auto create_split_pg_mapping(spg_t pgid, core_id_t core) {
- return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core);
+ auto create_split_pg_mapping(spg_t pgid, core_id_t core, unsigned int store_index) {
+ return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core, store_index);
}
auto remove_pg(spg_t pgid) {
seastar::future<Ref<PG>> make_pg(
cached_map_t create_map,
spg_t pgid,
+ unsigned int store_index,
bool do_create);
seastar::future<Ref<PG>> handle_pg_create_info(
+ unsigned int store_index,
std::unique_ptr<PGCreateInfo> info);
using get_or_create_pg_ertr = PGMap::wait_for_pg_ertr;
get_or_create_pg_ret get_or_create_pg(
PGMap::PGCreationBlockingEvent::TriggerI&&,
spg_t pgid,
+ unsigned int store_index,
std::unique_ptr<PGCreateInfo> info);
using wait_for_pg_ertr = PGMap::wait_for_pg_ertr;
PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
spg_t pgid);
- seastar::future<Ref<PG>> load_pg(spg_t pgid);
+ seastar::future<Ref<PG>> load_pg(spg_t pgid, unsigned int store_index);
/// Dispatch and reset ctx transaction
seastar::future<> dispatch_context_transaction(
- crimson::os::CollectionRef col, PeeringCtx &ctx);
+ crimson::os::CollectionRef col, PeeringCtx &ctx, unsigned int store_index);
/// Dispatch and reset ctx messages
seastar::future<> dispatch_context_messages(
/// Dispatch ctx and dispose of context
seastar::future<> dispatch_context(
+ unsigned int store_index,
crimson::os::CollectionRef col,
PeeringCtx &&ctx);
/// Dispatch ctx and dispose of ctx, transaction must be empty
seastar::future<> dispatch_context(
+ unsigned int store_index,
PeeringCtx &&ctx) {
- return dispatch_context({}, std::move(ctx));
+ return dispatch_context(store_index, {}, std::move(ctx));
}
PerShardPipeline &get_client_request_pipeline() {