From 8f1898188b32c8f08384433d9ba2e72ef57ee77a Mon Sep 17 00:00:00 2001 From: chunmei liu Date: Tue, 15 Jul 2025 23:58:09 -0700 Subject: [PATCH] crimson/osd: replace store call by with_store call in case need remote store calling. Signed-off-by: chunmei liu --- src/crimson/admin/osd_admin.cc | 8 +- src/crimson/osd/ec_backend.cc | 3 +- src/crimson/osd/ec_backend.h | 1 + src/crimson/osd/osd_meta.cc | 16 ++-- src/crimson/osd/osd_meta.h | 4 +- .../osd/osd_operations/scrub_events.cc | 30 ++++-- src/crimson/osd/pg.cc | 80 +++++++++------- src/crimson/osd/pg.h | 12 ++- src/crimson/osd/pg_backend.cc | 92 +++++++++++-------- src/crimson/osd/pg_backend.h | 3 +- src/crimson/osd/pg_meta.cc | 26 ++++-- src/crimson/osd/pg_meta.h | 4 +- src/crimson/osd/recovery_backend.cc | 6 +- src/crimson/osd/recovery_backend.h | 5 +- src/crimson/osd/replicated_backend.cc | 11 ++- .../osd/replicated_recovery_backend.cc | 41 ++++++--- src/crimson/osd/replicated_recovery_backend.h | 2 +- src/osd/PGLog.cc | 9 +- src/osd/PGLog.h | 4 +- src/osd/SnapMapper.cc | 17 +++- src/osd/SnapMapper.h | 11 ++- 21 files changed, 247 insertions(+), 138 deletions(-) diff --git a/src/crimson/admin/osd_admin.cc b/src/crimson/admin/osd_admin.cc index 56fa45417e7..cc4bb230fcf 100644 --- a/src/crimson/admin/osd_admin.cc +++ b/src/crimson/admin/osd_admin.cc @@ -506,7 +506,9 @@ public: logger().info("error during data error injection: {}", e.what()); co_return tell_result_t(-EINVAL, e.what()); } - co_await shard_services.get_store().inject_data_error(obj); + co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::inject_data_error>( + shard_services.get_store(0), + obj); logger().info("successfully injected data error for obj={}", obj); ceph::bufferlist bl; bl.append("ok"sv); @@ -548,7 +550,9 @@ public: logger().info("error during metadata error injection: {}", e.what()); co_return tell_result_t(-EINVAL, e.what()); } - co_await shard_services.get_store().inject_mdata_error(obj); + co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::inject_mdata_error>( + shard_services.get_store(0), + obj); logger().info("successfully injected metadata error for obj={}", obj); ceph::bufferlist bl; bl.append("ok"sv); diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 85bcb51e786..5ffb0faab18 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -7,10 +7,11 @@ namespace crimson::osd { ECBackend::ECBackend(shard_id_t shard, ECBackend::CollectionRef coll, crimson::osd::ShardServices& shard_services, + unsigned int store_index, const ec_profile_t&, uint64_t, DoutPrefixProvider &dpp) - : PGBackend{shard, coll, shard_services, dpp} + : PGBackend{shard, coll, shard_services, store_index, dpp} { // todo } diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index b28f7581bec..ffa2692c49c 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -17,6 +17,7 @@ public: ECBackend(shard_id_t shard, CollectionRef coll, crimson::osd::ShardServices& shard_services, + unsigned int store_index, const ec_profile_t& ec_profile, uint64_t stripe_width, DoutPrefixProvider &dpp); diff --git a/src/crimson/osd/osd_meta.cc b/src/crimson/osd/osd_meta.cc index d959ac14029..be6eb4d1bf4 100644 --- a/src/crimson/osd/osd_meta.cc +++ b/src/crimson/osd/osd_meta.cc @@ -42,7 +42,8 @@ void OSDMeta::remove_inc_map(ceph::os::Transaction& t, epoch_t e) seastar::future OSDMeta::load_map(epoch_t e) { - return store.read(coll, + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>( + store, coll, osdmap_oid(e), 0, 0, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED).handle_error( read_errorator::assert_all_func([e](const auto&) { @@ -53,7 +54,8 @@ seastar::future OSDMeta::load_map(epoch_t e) read_errorator::future OSDMeta::load_inc_map(epoch_t e) { - return store.read(coll, + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>( + store, coll, inc_osdmap_oid(e), 0, 0, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); } @@ -68,8 +70,9 @@ void OSDMeta::store_superblock(ceph::os::Transaction& t, OSDMeta::load_superblock_ret OSDMeta::load_superblock() { - return store.read( - coll, superblock_oid(), 0, 0 + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>( + store, + coll, superblock_oid(), 0, 0, 0 ).safe_then([] (bufferlist&& bl) { auto p = bl.cbegin(); OSDSuperblock superblock; @@ -82,8 +85,9 @@ seastar::future> OSDMeta::load_final_pool_info(int64_t pool) { - return store.read(coll, final_pool_info_oid(pool), - 0, 0).safe_then([] (bufferlist&& bl) { + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>( + store, coll, final_pool_info_oid(pool), + 0, 0, 0).safe_then([] (bufferlist&& bl) { auto p = bl.cbegin(); pg_pool_t pi; string name; diff --git a/src/crimson/osd/osd_meta.h b/src/crimson/osd/osd_meta.h index 0c6738aed8f..2f95e13718c 100644 --- a/src/crimson/osd/osd_meta.h +++ b/src/crimson/osd/osd_meta.h @@ -27,12 +27,12 @@ using read_errorator = crimson::os::FuturizedStore::Shard::read_errorator; class OSDMeta { template using Ref = boost::intrusive_ptr; - crimson::os::FuturizedStore::Shard& store; + crimson::os::FuturizedStore::StoreShardRef store; Ref coll; public: OSDMeta(Ref coll, - crimson::os::FuturizedStore::Shard& store) + crimson::os::FuturizedStore::StoreShardRef store) : store{store}, coll{coll} {} diff --git a/src/crimson/osd/osd_operations/scrub_events.cc b/src/crimson/osd/osd_operations/scrub_events.cc index c2fd916ff71..724b2c2695a 100644 --- a/src/crimson/osd/osd_operations/scrub_events.cc +++ b/src/crimson/osd/osd_operations/scrub_events.cc @@ -186,15 +186,19 @@ ScrubScan::ifut<> ScrubScan::scan_object( DEBUGDPP("obj: {}", pg, obj); auto &entry = ret.objects[obj.hobj]; return interruptor::make_interruptible( - pg.shard_services.get_store().stat( + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>( + pg.shard_services.get_store(pg.get_store_index()), pg.get_collection_ref(), - obj) + obj, + 0) ).then_interruptible([FNAME, &pg, &obj, &entry](struct stat obj_stat) { DEBUGDPP("obj: {}, stat complete, size {}", pg, obj, obj_stat.st_size); entry.size = obj_stat.st_size; - return pg.shard_services.get_store().get_attrs( + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>( + pg.shard_services.get_store(pg.get_store_index()), pg.get_collection_ref(), - obj); + obj, + 0); }).safe_then_interruptible([FNAME, &pg, &obj, &entry](auto &&attrs) { DEBUGDPP("obj: {}, got {} attrs", pg, obj, attrs.size()); for (auto &i : attrs) { @@ -244,11 +248,13 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object( pg, *this, obj, progress); const auto stride = local_conf().get_val( "osd_deep_scrub_stride"); - return pg.shard_services.get_store().read( + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>( + pg.shard_services.get_store(pg.get_store_index()), pg.get_collection_ref(), obj, *(progress.offset), - stride + stride, + 0 ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) { size_t offset = *progress.offset; DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}", @@ -279,9 +285,11 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object( { DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header", pg, *this, obj, progress); - return pg.shard_services.get_store().omap_get_header( + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_header>( + pg.shard_services.get_store(pg.get_store_index()), pg.get_collection_ref(), - obj + obj, + 0 ).safe_then([&progress](auto bl) { progress.omap_hash << bl; }).handle_error( @@ -319,11 +327,13 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object( { DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys", pg, *this, obj, progress); - return pg.shard_services.get_store().omap_iterate( + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( + pg.shard_services.get_store(pg.get_store_index()), pg.get_collection_ref(), obj, start_from, - callback + callback, + 0 ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) { assert(result == ObjectStore::omap_iter_ret_t::NEXT); DEBUGDPP("op: {}, obj: {}, progress: {} omap done", diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index f85ee660abb..f4d4b35911f 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -95,6 +95,7 @@ public: PG::PG( spg_t pgid, pg_shard_t pg_shard, + unsigned int store_index, crimson::os::CollectionRef coll_ref, pg_pool_t&& pool, std::string&& name, @@ -104,6 +105,7 @@ PG::PG( : pgid{pgid}, pg_whoami{pg_shard}, coll_ref{coll_ref}, + store_index{store_index}, pgmeta_oid{pgid.make_pgmeta_oid()}, osdmap_gate("PG::osdmap_gate"), shard_services{shard_services}, @@ -143,7 +145,7 @@ PG::PG( *backend.get(), *this}, osdriver( - &shard_services.get_store(), + shard_services.get_store(store_index), coll_ref, pgid.make_snapmapper_oid()), snap_mapper( @@ -184,11 +186,12 @@ void PG::check_blocklisted_watchers() bool PG::try_flush_or_schedule_async() { logger().debug("PG::try_flush_or_schedule_async: flush ..."); - (void)shard_services.get_store().flush( - coll_ref + (void)crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), + coll_ref, ceph::os::Transaction{} ).then( [this, epoch=get_osdmap_epoch()]() { - return shard_services.start_operation( + return shard_services.start_operation( this, pg_whoami, pgid, @@ -284,7 +287,7 @@ PG::interruptible_future<> PG::find_unfound(epoch_t epoch_started) PeeringState::UnfoundRecovery()); } } - return get_shard_services().dispatch_context(get_collection_ref(), std::move(rctx)); + return get_shard_services().dispatch_context(store_index, get_collection_ref(), std::move(rctx)); } void PG::recheck_readable() @@ -482,11 +485,13 @@ PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next) { logger().info("removing pg {}", pgid); auto fut = interruptor::make_interruptible( - shard_services.get_store().list_objects( + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>( + shard_services.get_store(store_index), coll_ref, _next, ghobject_t::get_max(), - local_conf()->osd_target_transaction_size)); + local_conf()->osd_target_transaction_size, + 0)); auto [objs_to_rm, next] = fut.get(); if (objs_to_rm.empty()) { @@ -494,8 +499,10 @@ PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next) t.remove(coll_ref->get_cid(), pgid.make_snapmapper_oid()); t.remove(coll_ref->get_cid(), pgmeta_oid); t.remove_collection(coll_ref->get_cid()); - (void) shard_services.get_store().do_transaction( - coll_ref, t.claim_and_reset()).then([this] { + (void) crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), + coll_ref, + t.claim_and_reset()).then([this] { return shard_services.remove_pg(pgid); }); return {next, false}; @@ -539,11 +546,13 @@ seastar::future<> PG::clear_temp_objects() ceph::os::Transaction t; auto max_size = local_conf()->osd_target_transaction_size; while(true) { - auto [objs, next] = co_await shard_services.get_store().list_objects( - coll_ref, _next, ghobject_t::get_max(), max_size); + auto [objs, next] = co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>( + shard_services.get_store(store_index), + coll_ref, _next, ghobject_t::get_max(), max_size, 0); if (objs.empty()) { if (!t.empty()) { - co_await shard_services.get_store().do_transaction( + co_await crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), coll_ref, std::move(t)); } break; @@ -555,7 +564,8 @@ seastar::future<> PG::clear_temp_objects() } _next = next; if (t.get_num_ops() >= max_size) { - co_await shard_services.get_store().do_transaction( + co_await crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), coll_ref, t.claim_and_reset()); } } @@ -786,26 +796,27 @@ seastar::future<> PG::init( role, newup, new_up_primary, newacting, new_acting_primary, history, pi, t); assert(coll_ref); - return shard_services.get_store().exists( - get_collection_ref(), pgid.make_snapmapper_oid() + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::exists>( + shard_services.get_store(store_index), + get_collection_ref(), pgid.make_snapmapper_oid(), 0 ).safe_then([&t, this](bool existed) { - if (!existed) { - t.touch(coll_ref->get_cid(), pgid.make_snapmapper_oid()); - } - }, - ::crimson::ct_error::assert_all{fmt::format( - "{} {} unexpected eio", *this, __func__).c_str()} - ); + if (!existed) { + t.touch(coll_ref->get_cid(), pgid.make_snapmapper_oid()); + } + }, + ::crimson::ct_error::assert_all{fmt::format( + "{} {} unexpected eio", *this, __func__).c_str()} +); } -seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store) +seastar::future<> PG::read_state(crimson::os::FuturizedStore::StoreShardRef store) { if (__builtin_expect(stopping, false)) { return seastar::make_exception_future<>( crimson::common::system_shutdown_exception()); } - return seastar::do_with(PGMeta(*store, pgid), [] (auto& pg_meta) { + return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) { return pg_meta.load(); }).then([this, store](auto&& ret) { auto [pg_info, past_intervals] = std::move(ret); @@ -814,7 +825,7 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store) std::move(past_intervals), [this, store] (PGLog &pglog) { return pglog.read_log_and_missing_crimson( - *store, + store, coll_ref, peering_state.get_info(), pgmeta_oid); @@ -844,7 +855,8 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store) return seastar::now(); }).then([this, store]() { logger().debug("{} setting collection options", __func__); - return store->set_collection_opts( + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::set_collection_opts>( + store, coll_ref, get_pgpool().info.opts); }); @@ -897,7 +909,8 @@ void PG::handle_initialize(PeeringCtx &rctx) void PG::init_collection_pool_opts() { - std::ignore = shard_services.get_store().set_collection_opts(coll_ref, get_pgpool().info.opts); + std::ignore = crimson::os::with_store<&crimson::os::FuturizedStore::Shard::set_collection_opts>( + shard_services.get_store(store_index), coll_ref, get_pgpool().info.opts); } void PG::on_pool_change() @@ -1139,7 +1152,8 @@ PG::interruptible_future PG::submit_error_log( } co_await interruptor::make_interruptible( - shard_services.get_store().do_transaction( + crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), get_collection_ref(), std::move(t) )); @@ -1321,7 +1335,9 @@ PG::handle_rep_op_fut PG::handle_rep_op(Ref req) DEBUGDPP("{} do_transaction", *this, *req); auto commit_fut = interruptor::make_interruptible( - shard_services.get_store().do_transaction(coll_ref, std::move(txn)) + crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), + coll_ref, std::move(txn)) ); const auto &lcod = peering_state.get_info().last_complete; @@ -1437,8 +1453,10 @@ PG::interruptible_future<> PG::do_update_log_missing( peering_state.append_log_entries_update_missing( m->entries, t, op_trim_to, op_pg_committed_to); - return interruptor::make_interruptible(shard_services.get_store().do_transaction( - coll_ref, std::move(t))).then_interruptible( + return interruptor::make_interruptible( + crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), + coll_ref, std::move(t))).then_interruptible( [m, conn, lcod=peering_state.get_info().last_complete, this] { if (!peering_state.pg_has_reset_since(m->get_epoch())) { peering_state.update_last_complete_ondisk(lcod); diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 5a8a0707c95..d029be27eaa 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -88,6 +88,7 @@ class PG : public boost::intrusive_ref_counter< spg_t pgid; pg_shard_t pg_whoami; crimson::os::CollectionRef coll_ref; + unsigned int store_index; ghobject_t pgmeta_oid; seastar::timer check_readable_timer; @@ -101,6 +102,7 @@ public: PG(spg_t pgid, pg_shard_t pg_shard, + unsigned int store_index, crimson::os::CollectionRef coll_ref, pg_pool_t&& pool, std::string&& name, @@ -118,6 +120,9 @@ public: return pgid; } + const unsigned int get_store_index() { + return store_index; + } PGBackend& get_backend() { return *backend; } @@ -198,6 +203,7 @@ public: std::swap(o, orderer); return seastar::when_all( shard_services.dispatch_context( + store_index, get_collection_ref(), std::move(rctx)), shard_services.run_orderer(std::move(o)) @@ -335,6 +341,7 @@ public: PGPeeringEventRef on_commit) final { LOG_PREFIX(PG::schedule_event_on_commit); SUBDEBUGDPP(osd, "on_commit {}", *this, on_commit->get_desc()); + t.register_on_commit( make_lambda_context( [this, on_commit=std::move(on_commit)](int) { @@ -594,7 +601,7 @@ public: const PastIntervals& pim, ceph::os::Transaction &t); - seastar::future<> read_state(crimson::os::FuturizedStore::Shard* store); + seastar::future<> read_state(crimson::os::FuturizedStore::StoreShardRef store); void do_peering_event(PGPeeringEvent& evt, PeeringCtx &rctx); @@ -625,7 +632,8 @@ public: seed, target); init_pg_ondisk(t, child, pool); - return shard_services.get_store().do_transaction( + return crimson::os::with_store_do_transaction( + shard_services.get_store(store_index), coll_ref, std::move(t)); } diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index 7abf8372c6c..792b118410c 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -61,7 +61,7 @@ PGBackend::create(pg_t pgid, coll, shard_services, dpp); case pg_pool_t::TYPE_ERASURE: - return std::make_unique(pg_shard.shard, coll, shard_services, + return std::make_unique( pg_shard.shard, coll, shard_services, pg.get_store_index(), std::move(ec_profile), pool.stripe_width, dpp); @@ -74,21 +74,24 @@ PGBackend::create(pg_t pgid, PGBackend::PGBackend(shard_id_t shard, CollectionRef coll, crimson::osd::ShardServices &shard_services, + unsigned int store_index, DoutPrefixProvider &dpp) : shard{shard}, coll{coll}, shard_services{shard_services}, dpp{dpp}, - store{&shard_services.get_store()} + store{shard_services.get_store(store_index)} {} PGBackend::load_metadata_iertr::future PGBackend::load_metadata(const hobject_t& oid) { - return interruptor::make_interruptible(store->get_attrs( + return interruptor::make_interruptible( + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>( + store, coll, - ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible( + ghobject_t{oid, ghobject_t::NO_GEN, shard}, 0)).safe_then_interruptible( [oid](auto &&attrs) -> load_metadata_ertr::future{ loaded_object_md_t::ref ret(new loaded_object_md_t()); if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) { @@ -255,13 +258,19 @@ PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op, } logger().trace("sparse_read: {} {}~{}", os.oi.soid, (uint64_t)op.extent.offset, (uint64_t)op.extent.length); - return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid}, - offset, adjusted_length)).safe_then_interruptible( + + return interruptor::make_interruptible( + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::fiemap>( + store, coll, ghobject_t{os.oi.soid}, + static_cast(offset), + static_cast(adjusted_length),static_cast(0))).safe_then_interruptible( [&delta_stats, &os, &osd_op, this](auto&& m) { return seastar::do_with(interval_set{std::move(m)}, [&delta_stats, &os, &osd_op, this](auto&& extents) { - return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid}, - extents, osd_op.op.flags)).safe_then_interruptible_tuple( + return interruptor::make_interruptible( + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::readv>( + store, coll, ghobject_t{os.oi.soid}, + std::ref(extents), osd_op.op.flags)).safe_then_interruptible_tuple( [&delta_stats, &os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> { if (_read_verify_data(os.oi, bl)) { osd_op.op.extent.length = bl.length(); @@ -1041,7 +1050,8 @@ PGBackend::list_objects( auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard}; auto gend = end.is_max() ? ghobject_t::get_max() : ghobject_t{end, 0, shard}; auto [gobjects, next] = co_await interruptor::make_interruptible( - store->list_objects(coll, gstart, gend, limit)); + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>( + store, coll, gstart, gend, limit, 0)); std::vector objects; boost::copy( @@ -1074,26 +1084,27 @@ PGBackend::setxattr_ierrorator::future<> PGBackend::setxattr( osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) { return crimson::ct_error::file_too_large::make(); } - - const auto max_name_len = std::min( - store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len); + return crimson::os::with_store< + &crimson::os::FuturizedStore::Shard::get_max_attr_name_length + >(store).then([this, &os, &osd_op, &txn, &delta_stats](unsigned max_name_len) { if (osd_op.op.xattr.name_len > max_name_len) { - return crimson::ct_error::enametoolong::make(); + return setxattr_ierrorator::future<>(crimson::ct_error::enametoolong::make()); } - maybe_create_new_object(os, txn, delta_stats); + maybe_create_new_object(os, txn, delta_stats); - std::string name{"_"}; - ceph::bufferlist val; - { - auto bp = osd_op.indata.cbegin(); - bp.copy(osd_op.op.xattr.name_len, name); - bp.copy(osd_op.op.xattr.value_len, val); - } - logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name); - txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val); - delta_stats.num_wr++; - return seastar::now(); + std::string name{"_"}; + ceph::bufferlist val; + { + auto bp = osd_op.indata.cbegin(); + bp.copy(osd_op.op.xattr.name_len, name); + bp.copy(osd_op.op.xattr.value_len, val); + } + logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name); + txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val); + delta_stats.num_wr++; + return setxattr_ierrorator::future<>(seastar::now()); + }); } PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr( @@ -1125,7 +1136,8 @@ PGBackend::getxattr( const hobject_t& soid, std::string_view key) const { - return store->get_attr(coll, ghobject_t{soid}, key); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attr>( + store, coll, ghobject_t{soid}, key, 0); } PGBackend::get_attr_ierrorator::future @@ -1134,7 +1146,8 @@ PGBackend::getxattr( std::string&& key) const { return seastar::do_with(key, [this, &soid](auto &key) { - return store->get_attr(coll, ghobject_t{soid}, key); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attr>( + store, coll, ghobject_t{soid}, key, 0); }); } @@ -1143,7 +1156,8 @@ PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs( OSDOp& osd_op, object_stat_sum_t& delta_stats) const { - return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then( + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>( + store, coll, ghobject_t{os.oi.soid}, 0).safe_then( [&delta_stats, &osd_op](auto&& attrs) { std::vector> user_xattrs; ceph::bufferlist bl; @@ -1296,13 +1310,14 @@ static get_omap_iertr::future< crimson::os::FuturizedStore::Shard::omap_values_t> maybe_get_omap_vals_by_keys( - crimson::os::FuturizedStore::Shard* store, + crimson::os::FuturizedStore::StoreShardRef store, const crimson::os::CollectionRef& coll, const object_info_t& oi, const std::set& keys_to_get) { if (oi.is_omap()) { - return store->omap_get_values(coll, ghobject_t{oi.soid}, keys_to_get); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>( + store, coll, ghobject_t{oi.soid}, keys_to_get, 0); } else { return crimson::ct_error::enodata::make(); } @@ -1315,14 +1330,15 @@ using omap_iterate_cb_t = crimson::os::FuturizedStore::Shard::omap_iterate_cb_t; static get_omap_iterate_ertr::future maybe_do_omap_iterate( - crimson::os::FuturizedStore::Shard* store, + crimson::os::FuturizedStore::StoreShardRef store, const crimson::os::CollectionRef& coll, const object_info_t& oi, ObjectStore::omap_iter_seek_t start_from, omap_iterate_cb_t callback) { if (oi.is_omap()) { - return store->omap_iterate(coll, ghobject_t{oi.soid}, start_from, callback); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( + store, coll, ghobject_t{oi.soid}, start_from, callback, 0); } else { return crimson::ct_error::enodata::make(); } @@ -1334,7 +1350,8 @@ PGBackend::omap_get_header( const ghobject_t& oid, uint32_t op_flags) const { - return store->omap_get_header(c, oid, op_flags) + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_header>( + store, c, oid, op_flags) .handle_error( crimson::ct_error::enodata::handle([] { return seastar::make_ready_future(); @@ -1486,7 +1503,8 @@ PGBackend::omap_cmp( for (auto &i: assertions) { to_get.insert(i.first); } - return store->omap_get_values(coll, ghobject_t{os.oi.soid}, to_get) + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>( + store, coll, ghobject_t{os.oi.soid}, to_get, 0) .safe_then([=, &osd_op] (auto&& out) -> omap_cmp_iertr::future<> { osd_op.rval = 0; return do_omap_val_cmp(out, assertions); @@ -1718,7 +1736,8 @@ PGBackend::stat( CollectionRef c, const ghobject_t& oid) const { - return store->stat(c, oid); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>( + store, c, oid, 0); } PGBackend::read_errorator::future> @@ -1729,7 +1748,8 @@ PGBackend::fiemap( uint64_t len, uint32_t op_flags) { - return store->fiemap(c, oid, off, len); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::fiemap>( + store, c, oid, off, len, 0); } PGBackend::write_iertr::future<> PGBackend::tmapput( diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 3402d31ff4a..dfc46348339 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -65,6 +65,7 @@ public: using rep_op_fut_t = interruptible_future; PGBackend(shard_id_t shard, CollectionRef coll, crimson::osd::ShardServices &shard_services, + unsigned int store_index, DoutPrefixProvider &dpp); virtual ~PGBackend() = default; static std::unique_ptr create(pg_t pgid, @@ -439,7 +440,7 @@ protected: CollectionRef coll; crimson::osd::ShardServices &shard_services; DoutPrefixProvider &dpp; ///< provides log prefix context - crimson::os::FuturizedStore::Shard* store; + crimson::os::FuturizedStore::StoreShardRef store; virtual seastar::future<> request_committed( const osd_reqid_t& reqid, const eversion_t& at_version) = 0; diff --git a/src/crimson/osd/pg_meta.cc b/src/crimson/osd/pg_meta.cc index 288ee52a086..346a24bab9b 100644 --- a/src/crimson/osd/pg_meta.cc +++ b/src/crimson/osd/pg_meta.cc @@ -14,7 +14,7 @@ using std::string_view; // easily skip them using crimson::os::FuturizedStore; -PGMeta::PGMeta(FuturizedStore::Shard& store, spg_t pgid) +PGMeta::PGMeta(FuturizedStore::StoreShardRef store, spg_t pgid) : store{store}, pgid{pgid} {} @@ -37,11 +37,15 @@ namespace { seastar::future PGMeta::get_epoch() { - return store.open_collection(coll_t{pgid}).then([this](auto ch) { - return store.omap_get_values(ch, + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>( + store, coll_t{pgid}).then([this](auto ch) { + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>( + store, ch, pgid.make_pgmeta_oid(), - {string{infover_key}, - string{epoch_key}}).safe_then( + std::set{ + string{infover_key}, + string{epoch_key}}, + 0).safe_then( [](auto&& values) { { // sanity check @@ -65,13 +69,17 @@ seastar::future PGMeta::get_epoch() seastar::future> PGMeta::load() { - return store.open_collection(coll_t{pgid}).then([this](auto ch) { - return store.omap_get_values(ch, + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>( + store, coll_t{pgid}).then([this](auto ch) { + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>( + store, ch, pgid.make_pgmeta_oid(), - {string{infover_key}, + std::set{ + string{infover_key}, string{info_key}, string{biginfo_key}, - string{fastinfo_key}}); + string{fastinfo_key}}, + 0); }).safe_then([](auto&& values) { { // sanity check diff --git a/src/crimson/osd/pg_meta.h b/src/crimson/osd/pg_meta.h index 21c2bb373b1..f82eabc9a9f 100644 --- a/src/crimson/osd/pg_meta.h +++ b/src/crimson/osd/pg_meta.h @@ -11,10 +11,10 @@ /// PG related metadata class PGMeta { - crimson::os::FuturizedStore::Shard& store; + crimson::os::FuturizedStore::StoreShardRef store; const spg_t pgid; public: - PGMeta(crimson::os::FuturizedStore::Shard& store, spg_t pgid); + PGMeta(crimson::os::FuturizedStore::StoreShardRef store, spg_t pgid); seastar::future get_epoch(); seastar::future> load(); }; diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index d27b231e64d..26da2a96026 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -161,7 +161,8 @@ RecoveryBackend::handle_backfill_progress( m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS, t); DEBUGDPP("submitting transaction", pg); - return shard_services.get_store().do_transaction( + return crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), pg.get_collection_ref(), std::move(t)).or_terminate(); } @@ -220,7 +221,8 @@ RecoveryBackend::handle_backfill_remove( } DEBUGDPP("submitting transaction", pg); co_await interruptor::make_interruptible( - shard_services.get_store().do_transaction( + crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), pg.get_collection_ref(), std::move(t)).or_terminate()); } diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index 699b31a0e00..11a761d7862 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -36,10 +36,11 @@ public: RecoveryBackend(crimson::osd::PG& pg, crimson::osd::ShardServices& shard_services, crimson::os::CollectionRef coll, + unsigned int store_index, PGBackend* backend) : pg{pg}, shard_services{shard_services}, - store{&shard_services.get_store()}, + store(shard_services.get_store(store_index)), coll{coll}, backend{backend} {} virtual ~RecoveryBackend() {} @@ -127,7 +128,7 @@ public: protected: crimson::osd::PG& pg; crimson::osd::ShardServices& shard_services; - crimson::os::FuturizedStore::Shard* store; + crimson::os::FuturizedStore::StoreShardRef store; crimson::os::CollectionRef coll; PGBackend* backend; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index e0cf611f9be..cd1a095dafe 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -19,11 +19,11 @@ namespace crimson::osd { ReplicatedBackend::ReplicatedBackend(pg_t pgid, pg_shard_t whoami, - crimson::osd::PG& pg, + crimson::osd::PG& pg, ReplicatedBackend::CollectionRef coll, crimson::osd::ShardServices& shard_services, DoutPrefixProvider &dpp) - : PGBackend{whoami.shard, coll, shard_services, dpp}, + : PGBackend{whoami.shard, coll, shard_services, pg.get_store_index(), dpp}, pgid{pgid}, whoami{whoami}, pg(pg), @@ -44,7 +44,8 @@ ReplicatedBackend::_read(const hobject_t& hoid, const uint64_t len, const uint32_t flags) { - return store->read(coll, ghobject_t{hoid}, off, len, flags); + return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>( + store, coll, ghobject_t{hoid}, off, len, flags); } MURef ReplicatedBackend::new_repop_msg( @@ -176,7 +177,9 @@ ReplicatedBackend::submit_transaction( false); auto all_completed = interruptor::make_interruptible( - shard_services.get_store().do_transaction(coll, std::move(txn)) + crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), + coll, std::move(txn)) ).then_interruptible([FNAME, this, peers=pending_txn->second.weak_from_this()] { if (!peers) { diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 226b66f1db2..3dea9ba904f 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -237,7 +237,9 @@ ReplicatedRecoveryBackend::on_local_recover_persist( soid, _recovery_info, is_delete, t ).then_interruptible([FNAME, this, &t] { DEBUGDPP("submitting transaction", pg); - return shard_services.get_store().do_transaction(coll, std::move(t)); + return crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), + coll, std::move(t)); }).then_interruptible( [this, epoch_frozen, last_complete = pg.get_info().last_complete] { pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); @@ -264,8 +266,10 @@ ReplicatedRecoveryBackend::local_recover_delete( }).then_interruptible( [FNAME, this, &txn]() mutable { DEBUGDPP("submitting transaction", pg); - return shard_services.get_store().do_transaction(coll, - std::move(txn)); + return crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), + coll, + std::move(txn)); }); }); } @@ -607,7 +611,8 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op( return seastar::make_ready_future(); })), interruptor::make_interruptible( - store->get_attrs(coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>( + store, coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) ).handle_error_interruptible( crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way( [FNAME, this, oid] (const std::error_code& e) { @@ -671,10 +676,11 @@ ReplicatedRecoveryBackend::read_object_for_push_op( // 3. read the truncated extents // TODO: check if the returned extents are pruned return interruptor::make_interruptible( - store->readv( + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::readv>( + store, coll, ghobject_t{oid}, - push_op->data_included, + std::ref(push_op->data_included), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)); }).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) { push_op->data.claim_append(std::move(bl)); @@ -742,8 +748,9 @@ ReplicatedRecoveryBackend::read_omap_for_push_op( }; co_await interruptor::make_interruptible( - shard_services.get_store().omap_iterate( - coll, ghobject_t{oid}, start_from, callback + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( + shard_services.get_store(pg.get_store_index()), + coll, ghobject_t{oid}, start_from, callback, 0 ).safe_then([&new_progress](auto ret) { if (ret == ObjectStore::omap_iter_ret_t::NEXT) { new_progress.omap_complete = true; @@ -906,14 +913,18 @@ ReplicatedRecoveryBackend::_handle_pull_response( ); DEBUGDPP("submitting transaction, complete", pg); co_await interruptor::make_interruptible( - shard_services.get_store().do_transaction(coll, std::move(t))); + crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), + coll, std::move(t))); } else { response->soid = push_op.soid; response->recovery_info = pull_info.recovery_info; response->recovery_progress = pull_info.recovery_progress; DEBUGDPP("submitting transaction, incomplete", pg); co_await interruptor::make_interruptible( - shard_services.get_store().do_transaction(coll, std::move(t))); + crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), + coll, std::move(t))); } co_return complete; @@ -1031,14 +1042,17 @@ ReplicatedRecoveryBackend::handle_push( false, t); co_await interruptor::make_interruptible( - shard_services.get_store().do_transaction(coll, std::move(t))); + crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), + coll, std::move(t))); replica_push_targets.erase(ptiter); pg.get_recovery_handler()->_committed_pushed_object( epoch_frozen, pg.get_info().last_complete); } else { co_await interruptor::make_interruptible( - shard_services.get_store().do_transaction(coll, std::move(t))); + crimson::os::with_store_do_transaction( + shard_services.get_store(pg.get_store_index()), coll, std::move(t))); } auto reply = crimson::make_message(); @@ -1214,7 +1228,8 @@ ReplicatedRecoveryBackend::prep_push_target( // clone overlap content in local object if using a new object auto st = co_await interruptor::make_interruptible( - store->stat(coll, ghobject_t(recovery_info.soid))); + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>( + store, coll, ghobject_t(recovery_info.soid), 0)); // TODO: pg num bytes counting uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size); diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index 5d80cd9d44a..cf62a192e0c 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -23,7 +23,7 @@ public: crimson::osd::ShardServices& shard_services, crimson::os::CollectionRef coll, PGBackend* backend) - : RecoveryBackend(pg, shard_services, coll, backend) + : RecoveryBackend(pg, shard_services, coll, pg.get_store_index(), backend) {} interruptible_future<> handle_recovery_op( Ref m, diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc index d0c06cab72a..6bbb719f297 100644 --- a/src/osd/PGLog.cc +++ b/src/osd/PGLog.cc @@ -1092,7 +1092,7 @@ void PGLog::rebuild_missing_set_with_deletes( namespace { struct FuturizedShardStoreLogReader { - crimson::os::FuturizedStore::Shard &store; + crimson::os::FuturizedStore::StoreShardRef store; const pg_info_t &info; PGLog::IndexedLog &log; std::set* log_keys_debug = NULL; @@ -1174,8 +1174,9 @@ namespace { return ObjectStore::omap_iter_ret_t::NEXT; }; - co_await store.omap_iterate( - ch, pgmeta_oid, start_from, callback + co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( + store, + ch, pgmeta_oid, start_from, callback, 0 ).safe_then([] (auto ret) { ceph_assert (ret == ObjectStore::omap_iter_ret_t::NEXT); }).handle_error( @@ -1199,7 +1200,7 @@ namespace { } seastar::future<> PGLog::read_log_and_missing_crimson( - crimson::os::FuturizedStore::Shard &store, + crimson::os::FuturizedStore::StoreShardRef store, crimson::os::CollectionRef ch, const pg_info_t &info, IndexedLog &log, diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h index 9a878a9cf43..5eb54bfa551 100644 --- a/src/osd/PGLog.h +++ b/src/osd/PGLog.h @@ -1847,7 +1847,7 @@ public: #ifdef WITH_CRIMSON seastar::future<> read_log_and_missing_crimson( - crimson::os::FuturizedStore::Shard &store, + crimson::os::FuturizedStore::StoreShardRef store, crimson::os::CollectionRef ch, const pg_info_t &info, ghobject_t pgmeta_oid @@ -1859,7 +1859,7 @@ public: } static seastar::future<> read_log_and_missing_crimson( - crimson::os::FuturizedStore::Shard &store, + crimson::os::FuturizedStore::StoreShardRef store, crimson::os::CollectionRef ch, const pg_info_t &info, IndexedLog &log, diff --git a/src/osd/SnapMapper.cc b/src/osd/SnapMapper.cc index 67b86239490..434d6899526 100644 --- a/src/osd/SnapMapper.cc +++ b/src/osd/SnapMapper.cc @@ -90,8 +90,11 @@ int OSDriver::get_keys( { CRIMSON_DEBUG("OSDriver::{}", __func__); using crimson::os::FuturizedStore; - return interruptor::green_get(os->omap_get_values( - ch, hoid, keys + + return interruptor::green_get( + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>( + os, + ch, hoid, keys, 0 ).safe_then([out] (FuturizedStore::Shard::omap_values_t&& vals) { // just the difference in comparator (`std::less<>` in omap_values_t`) reinterpret_cast(*out) = std::move(vals); @@ -131,7 +134,8 @@ int OSDriver::get_next( } }; return interruptor::green_get( - os->omap_iterate(ch, hoid, start_from, callback + crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>( + os, ch, hoid, start_from, callback, 0 ).safe_then([key] (auto ret) { if (ret == ObjectStore::omap_iter_ret_t::NEXT) { CRIMSON_DEBUG("OSDriver::get_next key {} no more values", key); @@ -153,8 +157,11 @@ int OSDriver::get_next_or_current( CRIMSON_DEBUG("OSDriver::{} key {}", __func__, key); using crimson::os::FuturizedStore; // let's try to get current first - return interruptor::green_get(os->omap_get_values( - ch, hoid, FuturizedStore::Shard::omap_keys_t{key} + + return interruptor::green_get(crimson::os::with_store< + &crimson::os::FuturizedStore::Shard::omap_get_values>( + os, + ch, hoid, FuturizedStore::Shard::omap_keys_t{key}, 0 ).safe_then([&key, next_or_current] (FuturizedStore::Shard::omap_values_t&& vals) { CRIMSON_DEBUG("OSDriver::get_next_or_current returning {}", key); ceph_assert(vals.size() == 1); diff --git a/src/osd/SnapMapper.h b/src/osd/SnapMapper.h index 377b2185b9d..3babc34d36c 100644 --- a/src/osd/SnapMapper.h +++ b/src/osd/SnapMapper.h @@ -38,12 +38,17 @@ class OSDriver : public MapCacher::StoreDriver #ifdef WITH_CRIMSON using ObjectStoreT = crimson::os::FuturizedStore::Shard; using CollectionHandleT = ObjectStoreT::CollectionRef; + + using ObjectStoreTLRef = seastar::shared_ptr; + using ObjectStoreTFRef = seastar::foreign_ptr; + using ObjectStoreTRef = ::crimson::local_shared_foreign_ptr; #else using ObjectStoreT = ObjectStore; using CollectionHandleT = ObjectStoreT::CollectionHandle; + using ObjectStoreTRef = ObjectStoreT*; #endif - ObjectStoreT *os; + ObjectStoreTRef os; CollectionHandleT ch; ghobject_t hoid; @@ -79,10 +84,10 @@ public: } #ifndef WITH_CRIMSON - OSDriver(ObjectStoreT *os, const coll_t& cid, const ghobject_t &hoid) : + OSDriver(ObjectStoreTRef os, const coll_t& cid, const ghobject_t &hoid) : OSDriver(os, os->open_collection(cid), hoid) {} #endif - OSDriver(ObjectStoreT *os, CollectionHandleT ch, const ghobject_t &hoid) : + OSDriver(ObjectStoreTRef os, CollectionHandleT ch, const ghobject_t &hoid) : os(os), ch(ch), hoid(hoid) {} -- 2.39.5