]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: replace store call by with_store call in case need remote store calling.
authorChunmei Liu <chunmei.liu@ibm.com>
Thu, 27 Nov 2025 09:03:12 +0000 (09:03 +0000)
committerChunmei Liu <chunmei.liu@ibm.com>
Tue, 27 Jan 2026 23:42:51 +0000 (23:42 +0000)
Signed-off-by: Chunmei Liu <chunmei.liu@ibm.com>
22 files changed:
src/crimson/admin/osd_admin.cc
src/crimson/common/smp_helpers.h
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/osd_meta.cc
src/crimson/osd/osd_meta.h
src/crimson/osd/osd_operations/scrub_events.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/pg_meta.cc
src/crimson/osd/pg_meta.h
src/crimson/osd/recovery_backend.cc
src/crimson/osd/recovery_backend.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.h
src/osd/PGLog.cc
src/osd/PGLog.h
src/osd/SnapMapper.cc
src/osd/SnapMapper.h

index 8e73180edb07bdead58a252f9aaf4f9b4cc0db5e..b14336051e21c053dc7826dd82c34f69d9fc8424 100644 (file)
@@ -442,7 +442,9 @@ public:
       logger().info("error during data error injection: {}", e.what());
       co_return tell_result_t(-EINVAL, e.what());
     }
-    co_await shard_services.get_store().inject_data_error(obj);
+    co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::inject_data_error>(
+      shard_services.get_store(DEFAULT_STORE_INDEX),
+      obj);
     logger().info("successfully injected data error for obj={}", obj);
     ceph::bufferlist bl;
     bl.append("ok"sv);
@@ -484,7 +486,9 @@ public:
       logger().info("error during metadata error injection: {}", e.what());
       co_return tell_result_t(-EINVAL, e.what());
     }
-    co_await shard_services.get_store().inject_mdata_error(obj);
+    co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::inject_mdata_error>(
+      shard_services.get_store(DEFAULT_STORE_INDEX),
+      obj);
     logger().info("successfully injected metadata error for obj={}", obj);
     ceph::bufferlist bl;
     bl.append("ok"sv);
index 4acaac5c9de932c5d90890fabb8d43994f84eb3d..1a8b2d9b2ced8313fbba4c10e494d44f59e33380 100644 (file)
@@ -22,6 +22,7 @@ namespace crimson {
 using core_id_t = seastar::shard_id;
 static constexpr core_id_t NULL_CORE = std::numeric_limits<core_id_t>::max();
 static constexpr unsigned int NULL_STORE_INDEX = std::numeric_limits<unsigned int>::max();
+static constexpr unsigned int DEFAULT_STORE_INDEX = 0;
 /**
  * submit_to
  *
index 85bcb51e786a5e76108ce08424ea91d708318c1d..5ffb0faab18ded18ea13cba0275a3d11814c73c7 100644 (file)
@@ -7,10 +7,11 @@ namespace crimson::osd {
 ECBackend::ECBackend(shard_id_t shard,
                      ECBackend::CollectionRef coll,
                      crimson::osd::ShardServices& shard_services,
+                     unsigned int store_index,
                      const ec_profile_t&,
                      uint64_t,
                     DoutPrefixProvider &dpp)
-  : PGBackend{shard, coll, shard_services, dpp}
+  : PGBackend{shard, coll, shard_services, store_index, dpp}
 {
   // todo
 }
index 398d878cfc7a9171c0e64502b3a96b4efac5ebad..4b1d47f5c363c0a36fccd7d6c14f885c01540923 100644 (file)
@@ -17,6 +17,7 @@ public:
   ECBackend(shard_id_t shard,
            CollectionRef coll,
            crimson::osd::ShardServices& shard_services,
+           unsigned int store_index,
            const ec_profile_t& ec_profile,
            uint64_t stripe_width,
            DoutPrefixProvider &dpp);
index 018ecba00af78c94f225ad754fa3793f851e3765..e064d7bfab1c71d3e031116cc664e313bdf33040 100644 (file)
@@ -42,7 +42,8 @@ void OSDMeta::remove_inc_map(ceph::os::Transaction& t, epoch_t e)
 
 seastar::future<bufferlist> OSDMeta::load_map(epoch_t e)
 {
-  return store.read(coll,
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+                    store, coll,
                     osdmap_oid(e), 0, 0,
                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED).handle_error(
     read_errorator::assert_all_func([e](const auto&) {
@@ -53,7 +54,8 @@ seastar::future<bufferlist> OSDMeta::load_map(epoch_t e)
 
 read_errorator::future<ceph::bufferlist> OSDMeta::load_inc_map(epoch_t e)
 {
-  return store.read(coll,
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+                    store, coll,
                     inc_osdmap_oid(e), 0, 0,
                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
 }
@@ -68,8 +70,9 @@ void OSDMeta::store_superblock(ceph::os::Transaction& t,
 
 OSDMeta::load_superblock_ret OSDMeta::load_superblock()
 {
-  return store.read(
-    coll, superblock_oid(), 0, 0
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+    store,
+    coll, superblock_oid(), 0, 0, 0
   ).safe_then([] (bufferlist&& bl) {
     auto p = bl.cbegin();
     OSDSuperblock superblock;
@@ -82,8 +85,9 @@ seastar::future<std::tuple<pg_pool_t,
                           std::string,
                           OSDMeta::ec_profile_t>>
 OSDMeta::load_final_pool_info(int64_t pool) {
-  return store.read(coll, final_pool_info_oid(pool),
-                     0, 0).safe_then([] (bufferlist&& bl) {
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+                     store, coll, final_pool_info_oid(pool),
+                     0, 0, 0).safe_then([] (bufferlist&& bl) {
     auto p = bl.cbegin();
     pg_pool_t pi;
     string name;
index 60e19e010f25eaf60471ae47a691b290e7589c03..822054a434ddd76a34ef147b3d9d706ff5e3bd66 100644 (file)
@@ -27,12 +27,12 @@ using read_errorator = crimson::os::FuturizedStore::Shard::read_errorator;
 class OSDMeta {
   template<typename T> using Ref = boost::intrusive_ptr<T>;
 
-  crimson::os::FuturizedStore::Shard& store;
+  crimson::os::FuturizedStore::StoreShardRef store;
   Ref<crimson::os::FuturizedCollection> coll;
 
 public:
   OSDMeta(Ref<crimson::os::FuturizedCollection> coll,
-          crimson::os::FuturizedStore::Shard& store)
+          crimson::os::FuturizedStore::StoreShardRef store)
     : store{store}, coll{coll}
   {}
 
index 5fb079c5e82eeb7337575c69d4fa60f730540b27..14627fef2b8ae14276ba2ba7fe72997c3b35359b 100644 (file)
@@ -186,15 +186,19 @@ ScrubScan::ifut<> ScrubScan::scan_object(
   DEBUGDPP("obj: {}", pg, obj);
   auto &entry = ret.objects[obj.hobj];
   return interruptor::make_interruptible(
-    pg.shard_services.get_store().stat(
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>(
+      pg.shard_services.get_store(pg.get_store_index()),
       pg.get_collection_ref(),
-      obj)
+      obj,
+      0)
   ).then_interruptible([FNAME, &pg, &obj, &entry](struct stat obj_stat) {
     DEBUGDPP("obj: {}, stat complete, size {}", pg, obj, obj_stat.st_size);
     entry.size = obj_stat.st_size;
-    return pg.shard_services.get_store().get_attrs(
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+      pg.shard_services.get_store(pg.get_store_index()),
       pg.get_collection_ref(),
-      obj);
+      obj,
+      0);
   }).safe_then_interruptible([FNAME, &pg, &obj, &entry](auto &&attrs) {
     DEBUGDPP("obj: {}, got {} attrs", pg, obj, attrs.size());
     for (auto &i : attrs) {
@@ -244,11 +248,13 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object(
                 pg, *this, obj, progress);
       const auto stride = local_conf().get_val<Option::size_t>(
         "osd_deep_scrub_stride");
-      return pg.shard_services.get_store().read(
+      return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+        pg.shard_services.get_store(pg.get_store_index()),
         pg.get_collection_ref(),
         obj,
         *(progress.offset),
-        stride
+        stride,
+        0
       ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) {
         size_t offset = *progress.offset;
         DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}",
@@ -279,9 +285,11 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object(
    {
       DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
                 pg, *this, obj, progress);
-      return pg.shard_services.get_store().omap_get_header(
+      return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_header>(
+        pg.shard_services.get_store(pg.get_store_index()),
         pg.get_collection_ref(),
-        obj
+        obj,
+        0
       ).safe_then([&progress](auto bl) {
         progress.omap_hash << bl;
       }).handle_error(
@@ -319,11 +327,13 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object(
     {
       DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
                 pg, *this, obj, progress);
-      return pg.shard_services.get_store().omap_iterate(
+      return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+        pg.shard_services.get_store(pg.get_store_index()),
         pg.get_collection_ref(),
         obj,
         start_from,
-        callback
+        callback,
+        0
       ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
         assert(result == ObjectStore::omap_iter_ret_t::NEXT);
         DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
index d327aefaa925436af0e796c00dd99d1e27196998..8e6c18d0bb6bc8d6f753a368349469d6f21ef1c0 100644 (file)
@@ -95,6 +95,7 @@ public:
 PG::PG(
   spg_t pgid,
   pg_shard_t pg_shard,
+  unsigned int store_index,
   crimson::os::CollectionRef coll_ref,
   pg_pool_t&& pool,
   std::string&& name,
@@ -104,6 +105,7 @@ PG::PG(
   : pgid{pgid},
     pg_whoami{pg_shard},
     coll_ref{coll_ref},
+    store_index{store_index},
     pgmeta_oid{pgid.make_pgmeta_oid()},
     osdmap_gate("PG::osdmap_gate"),
     shard_services{shard_services},
@@ -143,7 +145,7 @@ PG::PG(
       *backend.get(),
       *this},
     osdriver(
-      &shard_services.get_store(),
+      shard_services.get_store(store_index),
       coll_ref,
       pgid.make_snapmapper_oid()),
     snap_mapper(
@@ -184,11 +186,12 @@ void PG::check_blocklisted_watchers()
 
 bool PG::try_flush_or_schedule_async() {
   logger().debug("PG::try_flush_or_schedule_async: flush ...");
-  (void)shard_services.get_store().flush(
-    coll_ref
+  (void)crimson::os::with_store_do_transaction(
+    shard_services.get_store(store_index),
+    coll_ref, ceph::os::Transaction{}
   ).then(
     [this, epoch=get_osdmap_epoch()]() {
-      return shard_services.start_operation<LocalPeeringEvent>(
+    return shard_services.start_operation<LocalPeeringEvent>(
        this,
        pg_whoami,
        pgid,
@@ -284,7 +287,7 @@ PG::interruptible_future<> PG::find_unfound(epoch_t epoch_started)
         PeeringState::UnfoundRecovery());
     }
   }
-  return get_shard_services().dispatch_context(get_collection_ref(), std::move(rctx));
+  return get_shard_services().dispatch_context(store_index, get_collection_ref(), std::move(rctx));
 }
 
 void PG::recheck_readable()
@@ -482,11 +485,13 @@ PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
 {
   logger().info("removing pg {}", pgid);
   auto fut = interruptor::make_interruptible(
-    shard_services.get_store().list_objects(
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>(
+      shard_services.get_store(store_index),
       coll_ref,
       _next,
       ghobject_t::get_max(),
-      local_conf()->osd_target_transaction_size));
+      local_conf()->osd_target_transaction_size,
+      0));
 
   auto [objs_to_rm, next] = fut.get();
   if (objs_to_rm.empty()) {
@@ -494,8 +499,10 @@ PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
     t.remove(coll_ref->get_cid(), pgid.make_snapmapper_oid());
     t.remove(coll_ref->get_cid(), pgmeta_oid);
     t.remove_collection(coll_ref->get_cid());
-    (void) shard_services.get_store().do_transaction(
-      coll_ref, t.claim_and_reset()).then([this] {
+    (void) crimson::os::with_store_do_transaction(
+      shard_services.get_store(store_index),
+      coll_ref,
+      t.claim_and_reset()).then([this] {
       return shard_services.remove_pg(pgid);
     });
     return {next, false};
@@ -539,11 +546,13 @@ seastar::future<> PG::clear_temp_objects()
   ceph::os::Transaction t;
   auto max_size = local_conf()->osd_target_transaction_size;
   while(true) {
-    auto [objs, next] = co_await shard_services.get_store().list_objects(
-      coll_ref, _next, ghobject_t::get_max(), max_size);
+    auto [objs, next] = co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>(
+      shard_services.get_store(store_index),
+      coll_ref, _next, ghobject_t::get_max(), max_size, 0);
     if (objs.empty()) {
       if (!t.empty()) {
-        co_await shard_services.get_store().do_transaction(
+        co_await crimson::os::with_store_do_transaction(
+          shard_services.get_store(store_index),
           coll_ref, std::move(t));
       }
       break;
@@ -555,7 +564,8 @@ seastar::future<> PG::clear_temp_objects()
     }
     _next = next;
     if (t.get_num_ops() >= max_size) {
-      co_await shard_services.get_store().do_transaction(
+      co_await crimson::os::with_store_do_transaction(
+        shard_services.get_store(store_index),
         coll_ref, t.claim_and_reset());
     }
   }
@@ -786,26 +796,27 @@ seastar::future<> PG::init(
     role, newup, new_up_primary, newacting,
     new_acting_primary, history, pi, t);
   assert(coll_ref);
-  return shard_services.get_store().exists(
-    get_collection_ref(), pgid.make_snapmapper_oid()
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::exists>(
+    shard_services.get_store(store_index),
+    get_collection_ref(), pgid.make_snapmapper_oid(), 0
   ).safe_then([&t, this](bool existed) {
-      if (!existed) {
-        t.touch(coll_ref->get_cid(), pgid.make_snapmapper_oid());
-      }
-    },
-    ::crimson::ct_error::assert_all{fmt::format(
-      "{} {} unexpected eio", *this, __func__).c_str()}
-  );
+    if (!existed) {
+      t.touch(coll_ref->get_cid(), pgid.make_snapmapper_oid());
+    }
+  },
+  ::crimson::ct_error::assert_all{fmt::format(
+    "{} {} unexpected eio", *this, __func__).c_str()}
+);
 }
 
-seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
+seastar::future<> PG::read_state(crimson::os::FuturizedStore::StoreShardRef store)
 {
   if (__builtin_expect(stopping, false)) {
     return seastar::make_exception_future<>(
        crimson::common::system_shutdown_exception());
   }
 
-  return seastar::do_with(PGMeta(*store, pgid), [] (auto& pg_meta) {
+  return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
     return pg_meta.load();
   }).then([this, store](auto&& ret) {
     auto [pg_info, past_intervals] = std::move(ret);
@@ -814,7 +825,7 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
        std::move(past_intervals),
        [this, store] (PGLog &pglog) {
          return pglog.read_log_and_missing_crimson(
-           *store,
+           store,
            coll_ref,
            peering_state.get_info(),
            pgmeta_oid);
@@ -844,7 +855,8 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
     return seastar::now();
   }).then([this, store]() {
     logger().debug("{} setting collection options", __func__);
-    return store->set_collection_opts(
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::set_collection_opts>(
+          store,
           coll_ref,
           get_pgpool().info.opts);
   });
@@ -897,7 +909,8 @@ void PG::handle_initialize(PeeringCtx &rctx)
 
 void PG::init_collection_pool_opts()
 {
-  std::ignore = shard_services.get_store().set_collection_opts(coll_ref, get_pgpool().info.opts);
+  std::ignore = crimson::os::with_store<&crimson::os::FuturizedStore::Shard::set_collection_opts>(
+    shard_services.get_store(store_index), coll_ref, get_pgpool().info.opts);
 }
 
 void PG::on_pool_change()
@@ -1139,7 +1152,8 @@ PG::interruptible_future<eversion_t> PG::submit_error_log(
   }
 
   co_await interruptor::make_interruptible(
-    shard_services.get_store().do_transaction(
+    crimson::os::with_store_do_transaction(
+      shard_services.get_store(store_index),
       get_collection_ref(), std::move(t)
     ));
 
@@ -1332,7 +1346,9 @@ PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req)
   DEBUGDPP("{} do_transaction", *this, *req);
 
   auto commit_fut = interruptor::make_interruptible(
-    shard_services.get_store().do_transaction(coll_ref, std::move(txn))
+    crimson::os::with_store_do_transaction(
+      shard_services.get_store(store_index),
+      coll_ref, std::move(txn))
   );
 
   const auto &lcod = peering_state.get_info().last_complete;
@@ -1448,8 +1464,10 @@ PG::interruptible_future<> PG::do_update_log_missing(
   peering_state.append_log_entries_update_missing(
     m->entries, t, op_trim_to, op_pg_committed_to);
 
-  return interruptor::make_interruptible(shard_services.get_store().do_transaction(
-    coll_ref, std::move(t))).then_interruptible(
+  return interruptor::make_interruptible(
+    crimson::os::with_store_do_transaction(
+      shard_services.get_store(store_index),
+      coll_ref, std::move(t))).then_interruptible(
     [m, conn, lcod=peering_state.get_info().last_complete, this] {
     if (!peering_state.pg_has_reset_since(m->get_epoch())) {
       peering_state.update_last_complete_ondisk(lcod);
index c840648975d13ffde2c6bda8f59c5c872b94f9e2..9cad2aa656c7b7967ec4d604f4d53b264b900ee9 100644 (file)
@@ -88,6 +88,7 @@ class PG : public boost::intrusive_ref_counter<
   spg_t pgid;
   pg_shard_t pg_whoami;
   crimson::os::CollectionRef coll_ref;
+  unsigned int store_index;
   ghobject_t pgmeta_oid;
 
   seastar::timer<seastar::lowres_clock> check_readable_timer;
@@ -101,6 +102,7 @@ public:
 
   PG(spg_t pgid,
      pg_shard_t pg_shard,
+     unsigned int store_index,
      crimson::os::CollectionRef coll_ref,
      pg_pool_t&& pool,
      std::string&& name,
@@ -118,6 +120,9 @@ public:
     return pgid;
   }
 
+  const unsigned int get_store_index() {
+    return store_index;
+  }
   PGBackend& get_backend() {
     return *backend;
   }
@@ -198,6 +203,7 @@ public:
     std::swap(o, orderer);
     return seastar::when_all(
       shard_services.dispatch_context(
+        store_index,
        get_collection_ref(),
        std::move(rctx)),
       shard_services.run_orderer(std::move(o))
@@ -335,6 +341,7 @@ public:
     PGPeeringEventRef on_commit) final {
     LOG_PREFIX(PG::schedule_event_on_commit);
     SUBDEBUGDPP(osd, "on_commit {}", *this, on_commit->get_desc());
+
     t.register_on_commit(
       make_lambda_context(
        [this, on_commit=std::move(on_commit)](int) {
@@ -594,7 +601,7 @@ public:
     const PastIntervals& pim,
     ceph::os::Transaction &t);
 
-  seastar::future<> read_state(crimson::os::FuturizedStore::Shard* store);
+  seastar::future<> read_state(crimson::os::FuturizedStore::StoreShardRef store);
 
   void do_peering_event(PGPeeringEvent& evt, PeeringCtx &rctx);
 
@@ -629,7 +636,8 @@ public:
       seed,
       target);
     init_pg_ondisk(t, child, pool);
-    return shard_services.get_store().do_transaction(
+    return crimson::os::with_store_do_transaction(
+      shard_services.get_store(store_index),
       coll_ref, std::move(t));
   }
 
index ecdf7efb6fa6b2d7dfba239f1137cb98c0825275..b35d4bd2ffeed5cec095253a8ab07b7a7b666176 100644 (file)
@@ -61,7 +61,7 @@ PGBackend::create(pg_t pgid,
                                               coll, shard_services,
                                               dpp);
   case pg_pool_t::TYPE_ERASURE:
-    return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
+    return std::make_unique<ECBackend>( pg_shard.shard, coll, shard_services, pg.get_store_index(),
                                        std::move(ec_profile),
                                        pool.stripe_width,
                                       dpp);
@@ -74,21 +74,24 @@ PGBackend::create(pg_t pgid,
 PGBackend::PGBackend(shard_id_t shard,
                      CollectionRef coll,
                      crimson::osd::ShardServices &shard_services,
+                     unsigned int store_index,
                     DoutPrefixProvider &dpp)
   : shard{shard},
     coll{coll},
     shard_services{shard_services},
     dpp{dpp},
-    store{&shard_services.get_store()}
+    store{shard_services.get_store(store_index)}
 {}
 
 PGBackend::load_metadata_iertr::future
   <PGBackend::loaded_object_md_t::ref>
 PGBackend::load_metadata(const hobject_t& oid)
 {
-  return interruptor::make_interruptible(store->get_attrs(
+  return interruptor::make_interruptible(
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+    store,
     coll,
-    ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible(
+    ghobject_t{oid, ghobject_t::NO_GEN, shard}, 0)).safe_then_interruptible(
       [oid](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
         loaded_object_md_t::ref ret(new loaded_object_md_t());
         if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
@@ -255,13 +258,19 @@ PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op,
   }
   logger().trace("sparse_read: {} {}~{}",
                  os.oi.soid, (uint64_t)op.extent.offset, (uint64_t)op.extent.length);
-  return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid},
-    offset, adjusted_length)).safe_then_interruptible(
+
+  return interruptor::make_interruptible(
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::fiemap>(
+    store, coll, ghobject_t{os.oi.soid},
+    static_cast<uint64_t>(offset),
+    static_cast<uint64_t>(adjusted_length),static_cast<uint32_t>(0))).safe_then_interruptible(
     [&delta_stats, &os, &osd_op, this](auto&& m) {
     return seastar::do_with(interval_set<uint64_t>{std::move(m)},
                            [&delta_stats, &os, &osd_op, this](auto&& extents) {
-      return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid},
-                          extents, osd_op.op.flags)).safe_then_interruptible_tuple(
+      return interruptor::make_interruptible(
+        crimson::os::with_store<&crimson::os::FuturizedStore::Shard::readv>(
+          store, coll, ghobject_t{os.oi.soid},
+          std::ref(extents), osd_op.op.flags)).safe_then_interruptible_tuple(
         [&delta_stats, &os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
         if (_read_verify_data(os.oi, bl)) {
           osd_op.op.extent.length = bl.length();
@@ -1048,7 +1057,8 @@ PGBackend::list_objects(
   auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
   auto gend = end.is_max() ? ghobject_t::get_max() : ghobject_t{end, 0, shard};
   auto [gobjects, next] = co_await interruptor::make_interruptible(
-    store->list_objects(coll, gstart, gend, limit));
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>(
+      store, coll, gstart, gend, limit, 0));
 
   std::vector<hobject_t> objects;
   boost::copy(
@@ -1081,26 +1091,29 @@ PGBackend::setxattr_ierrorator::future<> PGBackend::setxattr(
       osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
     return crimson::ct_error::file_too_large::make();
   }
+  return crimson::os::with_store<
+    &crimson::os::FuturizedStore::Shard::get_max_attr_name_length
+  >(store).then([this, &os, &osd_op, &txn, &delta_stats](unsigned store_max_name_len) {
+    const auto max_name_len = std::min<uint64_t>(
+      store_max_name_len, local_conf()->osd_max_attr_name_len);
+    if (osd_op.op.xattr.name_len > max_name_len) {
+      return setxattr_ierrorator::future<>(crimson::ct_error::enametoolong::make());
+    }
 
-  const auto max_name_len = std::min<uint64_t>(
-    store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len);
-  if (osd_op.op.xattr.name_len > max_name_len) {
-    return crimson::ct_error::enametoolong::make();
-  }
+    maybe_create_new_object(os, txn, delta_stats);
 
-  maybe_create_new_object(os, txn, delta_stats);
-
-  std::string name{"_"};
-  ceph::bufferlist val;
-  {
-    auto bp = osd_op.indata.cbegin();
-    bp.copy(osd_op.op.xattr.name_len, name);
-    bp.copy(osd_op.op.xattr.value_len, val);
-  }
-  logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
-  txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
-  delta_stats.num_wr++;
-  return seastar::now();
+    std::string name{"_"};
+    ceph::bufferlist val;
+    {
+      auto bp = osd_op.indata.cbegin();
+      bp.copy(osd_op.op.xattr.name_len, name);
+      bp.copy(osd_op.op.xattr.value_len, val);
+    }
+    logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
+    txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
+    delta_stats.num_wr++;
+    return setxattr_ierrorator::future<>(seastar::now());
+  });
 }
 
 PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr(
@@ -1132,7 +1145,8 @@ PGBackend::getxattr(
   const hobject_t& soid,
   std::string_view key) const
 {
-  return store->get_attr(coll, ghobject_t{soid}, key);
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attr>(
+    store, coll, ghobject_t{soid}, key, 0);
 }
 
 PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
@@ -1141,7 +1155,8 @@ PGBackend::getxattr(
   std::string&& key) const
 {
   return seastar::do_with(key, [this, &soid](auto &key) {
-    return store->get_attr(coll, ghobject_t{soid}, key);
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attr>(
+      store, coll, ghobject_t{soid}, key, 0);
   });
 }
 
@@ -1150,7 +1165,8 @@ PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs(
   OSDOp& osd_op,
   object_stat_sum_t& delta_stats) const
 {
-  return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+    store, coll, ghobject_t{os.oi.soid}, 0).safe_then(
     [&delta_stats, &osd_op](auto&& attrs) {
     std::vector<std::pair<std::string, bufferlist>> user_xattrs;
     ceph::bufferlist bl;
@@ -1303,13 +1319,14 @@ static
 get_omap_iertr::future<
   crimson::os::FuturizedStore::Shard::omap_values_t>
 maybe_get_omap_vals_by_keys(
-  crimson::os::FuturizedStore::Shard* store,
+  crimson::os::FuturizedStore::StoreShardRef store,
   const crimson::os::CollectionRef& coll,
   const object_info_t& oi,
   const std::set<std::string>& keys_to_get)
 {
   if (oi.is_omap()) {
-    return store->omap_get_values(coll, ghobject_t{oi.soid}, keys_to_get);
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+      store, coll, ghobject_t{oi.soid}, keys_to_get, 0);
   } else {
     return crimson::ct_error::enodata::make();
   }
@@ -1322,14 +1339,15 @@ using omap_iterate_cb_t = crimson::os::FuturizedStore::Shard::omap_iterate_cb_t;
 static
 get_omap_iterate_ertr::future<ObjectStore::omap_iter_ret_t>
 maybe_do_omap_iterate(
-  crimson::os::FuturizedStore::Shard* store,
+  crimson::os::FuturizedStore::StoreShardRef store,
   const crimson::os::CollectionRef& coll,
   const object_info_t& oi,
   ObjectStore::omap_iter_seek_t start_from,
   omap_iterate_cb_t callback)
 {
   if (oi.is_omap()) {
-    return store->omap_iterate(coll, ghobject_t{oi.soid}, start_from, callback);
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+      store, coll, ghobject_t{oi.soid}, start_from, callback, 0);
   } else {
     return crimson::ct_error::enodata::make();
   }
@@ -1341,7 +1359,8 @@ PGBackend::omap_get_header(
   const ghobject_t& oid,
   uint32_t op_flags) const
 {
-  return store->omap_get_header(c, oid, op_flags)
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_header>(
+    store, c, oid, op_flags)
     .handle_error(
       crimson::ct_error::enodata::handle([] {
        return seastar::make_ready_future<bufferlist>();
@@ -1493,7 +1512,8 @@ PGBackend::omap_cmp(
     for (auto &i: assertions) {
       to_get.insert(i.first);
     }
-    return store->omap_get_values(coll, ghobject_t{os.oi.soid}, to_get)
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+      store, coll, ghobject_t{os.oi.soid}, to_get, 0)
       .safe_then([=, &osd_op] (auto&& out) -> omap_cmp_iertr::future<> {
       osd_op.rval = 0;
       return  do_omap_val_cmp(out, assertions);
@@ -1725,7 +1745,8 @@ PGBackend::stat(
   CollectionRef c,
   const ghobject_t& oid) const
 {
-  return store->stat(c, oid);
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>(
+    store, c, oid, 0);
 }
 
 PGBackend::read_errorator::future<std::map<uint64_t, uint64_t>>
@@ -1736,7 +1757,8 @@ PGBackend::fiemap(
   uint64_t len,
   uint32_t op_flags)
 {
-  return store->fiemap(c, oid, off, len);
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::fiemap>(
+    store, c, oid, off, len, 0);
 }
 
 PGBackend::write_iertr::future<> PGBackend::tmapput(
index 52b416e36ba69f7e89914833919aaa819a91b84a..1b833b507f51d5f2965b9bdf0f33599c55093b85 100644 (file)
@@ -65,6 +65,7 @@ public:
   using rep_op_fut_t = interruptible_future<rep_op_ret_t>;
   PGBackend(shard_id_t shard, CollectionRef coll,
             crimson::osd::ShardServices &shard_services,
+            unsigned int store_index,
             DoutPrefixProvider &dpp);
   virtual ~PGBackend() = default;
   static std::unique_ptr<PGBackend> create(pg_t pgid,
@@ -439,7 +440,7 @@ protected:
   CollectionRef coll;
   crimson::osd::ShardServices &shard_services;
   DoutPrefixProvider &dpp; ///< provides log prefix context
-  crimson::os::FuturizedStore::Shard* store;
+  crimson::os::FuturizedStore::StoreShardRef store;
   virtual seastar::future<> request_committed(
     const osd_reqid_t& reqid,
     const eversion_t& at_version) = 0;
index 0455a27d69e4b92135d0f0a76b49aa1f1ed9608b..ffaf5aede6a50c227c9f0f1debf0bf1639e4ac6d 100644 (file)
@@ -14,7 +14,7 @@ using std::string_view;
 // easily skip them
 using crimson::os::FuturizedStore;
 
-PGMeta::PGMeta(FuturizedStore::Shard& store, spg_t pgid)
+PGMeta::PGMeta(FuturizedStore::StoreShardRef store, spg_t pgid)
   : store{store},
     pgid{pgid}
 {}
@@ -37,11 +37,15 @@ namespace {
 
 seastar::future<epoch_t> PGMeta::get_epoch()
 {
-  return store.open_collection(coll_t{pgid}).then([this](auto ch) {
-    return store.omap_get_values(ch,
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>(
+    store, coll_t{pgid}).then([this](auto ch) {
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+                                 store, ch,
                                  pgid.make_pgmeta_oid(),
-                                 {string{infover_key},
-                                  string{epoch_key}}).safe_then(
+                                 std::set<std::string>{
+                                  string{infover_key},
+                                  string{epoch_key}},
+                                 0).safe_then(
     [](auto&& values) {
       {
         // sanity check
@@ -65,13 +69,17 @@ seastar::future<epoch_t> PGMeta::get_epoch()
 
 seastar::future<std::tuple<pg_info_t, PastIntervals>> PGMeta::load()
 {
-  return store.open_collection(coll_t{pgid}).then([this](auto ch) {
-    return store.omap_get_values(ch,
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>(
+    store, coll_t{pgid}).then([this](auto ch) {
+    return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+                                 store, ch,
                                  pgid.make_pgmeta_oid(),
-                                 {string{infover_key},
+                                 std::set<std::string>{
+                                  string{infover_key},
                                   string{info_key},
                                   string{biginfo_key},
-                                  string{fastinfo_key}});
+                                  string{fastinfo_key}},
+                                  0);
   }).safe_then([](auto&& values) {
     {
       // sanity check
index 1e361fc8f1310c76c7d9ba41c07122eada9e6f08..83bf43c8ca577f29f831784bf572580ee06ad728 100644 (file)
 /// PG related metadata
 class PGMeta
 {
-  crimson::os::FuturizedStore::Shard& store;
+  crimson::os::FuturizedStore::StoreShardRef store;
   const spg_t pgid;
 public:
-  PGMeta(crimson::os::FuturizedStore::Shard& store, spg_t pgid);
+  PGMeta(crimson::os::FuturizedStore::StoreShardRef store, spg_t pgid);
   seastar::future<epoch_t> get_epoch();
   seastar::future<std::tuple<pg_info_t, PastIntervals>> load();
 };
index 16863d8077a1e5b4c733f4ca6a636129ac884c1f..6e7ec6729f2e4e8e0e3f6a1a9e2f19ea9d5f014a 100644 (file)
@@ -158,7 +158,8 @@ RecoveryBackend::handle_backfill_progress(
     m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
     t);
   DEBUGDPP("submitting transaction", pg);
-  return shard_services.get_store().do_transaction(
+  return crimson::os::with_store_do_transaction(
+    shard_services.get_store(pg.get_store_index()),
     pg.get_collection_ref(), std::move(t)).or_terminate();
 }
 
@@ -217,7 +218,8 @@ RecoveryBackend::handle_backfill_remove(
   }
   DEBUGDPP("submitting transaction", pg);
   co_await interruptor::make_interruptible(
-    shard_services.get_store().do_transaction(
+    crimson::os::with_store_do_transaction(
+      shard_services.get_store(pg.get_store_index()),
       pg.get_collection_ref(), std::move(t)).or_terminate());
 }
 
index 5acf71a5b1b2bae04f7cc3dcddb05793696760e6..373fa8ddcb720e9aec3d6cd13d7cbf01cc0bc55c 100644 (file)
@@ -36,10 +36,11 @@ public:
   RecoveryBackend(crimson::osd::PG& pg,
                  crimson::osd::ShardServices& shard_services,
                  crimson::os::CollectionRef coll,
+      unsigned int store_index,
                  PGBackend* backend)
     : pg{pg},
       shard_services{shard_services},
-      store{&shard_services.get_store()},
+      store(shard_services.get_store(store_index)),
       coll{coll},
       backend{backend} {}
   virtual ~RecoveryBackend() {}
@@ -127,7 +128,7 @@ public:
 protected:
   crimson::osd::PG& pg;
   crimson::osd::ShardServices& shard_services;
-  crimson::os::FuturizedStore::Shard* store;
+  crimson::os::FuturizedStore::StoreShardRef store;
   crimson::os::CollectionRef coll;
   PGBackend* backend;
 
index af5df2d4ab6376ed5620f96bd50506304a272386..ef062d14be7cda4ca0a4f330842d830d2e1a1e5f 100644 (file)
@@ -19,11 +19,11 @@ namespace crimson::osd {
 
 ReplicatedBackend::ReplicatedBackend(pg_t pgid,
                                      pg_shard_t whoami,
-                                    crimson::osd::PG& pg,
+                                     crimson::osd::PG& pg,
                                      ReplicatedBackend::CollectionRef coll,
                                      crimson::osd::ShardServices& shard_services,
                                     DoutPrefixProvider &dpp)
-  : PGBackend{whoami.shard, coll, shard_services, dpp},
+  : PGBackend{whoami.shard, coll, shard_services, pg.get_store_index(), dpp},
     pgid{pgid},
     whoami{whoami},
     pg(pg),
@@ -44,7 +44,8 @@ ReplicatedBackend::_read(const hobject_t& hoid,
                          const uint64_t len,
                          const uint32_t flags)
 {
-  return store->read(coll, ghobject_t{hoid}, off, len, flags);
+  return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+    store, coll, ghobject_t{hoid}, off, len, flags);
 }
 
 MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
@@ -176,7 +177,9 @@ ReplicatedBackend::submit_transaction(
     false);
 
   auto all_completed = interruptor::make_interruptible(
-      shard_services.get_store().do_transaction(coll, std::move(txn))
+    crimson::os::with_store_do_transaction(
+      shard_services.get_store(pg.get_store_index()),
+      coll, std::move(txn))
    ).then_interruptible([FNAME, this,
                        peers=pending_txn->second.weak_from_this()] {
     if (!peers) {
index 5503063cbc9d810d06fb6235755d73034f34a1e9..cb9ee87665fd5629bde3ba2949cd841eb8b8167a 100644 (file)
@@ -245,7 +245,9 @@ ReplicatedRecoveryBackend::on_local_recover_persist(
       soid, _recovery_info, is_delete, t
     ).then_interruptible([FNAME, this, &t] {
       DEBUGDPP("submitting transaction", pg);
-      return shard_services.get_store().do_transaction(coll, std::move(t));
+      return crimson::os::with_store_do_transaction(
+        shard_services.get_store(pg.get_store_index()),
+        coll, std::move(t));
     }).then_interruptible(
       [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
       pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
@@ -272,8 +274,10 @@ ReplicatedRecoveryBackend::local_recover_delete(
         }).then_interruptible(
          [FNAME, this, &txn]() mutable {
          DEBUGDPP("submitting transaction", pg);
-         return shard_services.get_store().do_transaction(coll,
-                                                          std::move(txn));
+         return crimson::os::with_store_do_transaction(
+      shard_services.get_store(pg.get_store_index()),
+      coll,
+      std::move(txn));
        });
       });
     }
@@ -615,7 +619,8 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op(
          return seastar::make_ready_future<bufferlist>();
        })),
       interruptor::make_interruptible(
-        store->get_attrs(coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
+        crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+          store, coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
       ).handle_error_interruptible<false>(
        crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way(
          [FNAME, this, oid] (const std::error_code& e) {
@@ -679,10 +684,11 @@ ReplicatedRecoveryBackend::read_object_for_push_op(
     // 3. read the truncated extents
     // TODO: check if the returned extents are pruned
     return interruptor::make_interruptible(
-      store->readv(
+      crimson::os::with_store<&crimson::os::FuturizedStore::Shard::readv>(
+        store,
         coll,
         ghobject_t{oid},
-        push_op->data_included,
+        std::ref(push_op->data_included),
         CEPH_OSD_OP_FLAG_FADVISE_DONTNEED));
   }).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) {
     push_op->data.claim_append(std::move(bl));
@@ -750,8 +756,9 @@ ReplicatedRecoveryBackend::read_omap_for_push_op(
   };
 
   co_await interruptor::make_interruptible(
-    shard_services.get_store().omap_iterate(
-      coll, ghobject_t{oid}, start_from, callback
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+      shard_services.get_store(pg.get_store_index()),
+      coll, ghobject_t{oid}, start_from, callback, 0
     ).safe_then([&new_progress](auto ret) {
       if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
         new_progress.omap_complete = true;
@@ -914,14 +921,18 @@ ReplicatedRecoveryBackend::_handle_pull_response(
     );
     DEBUGDPP("submitting transaction, complete", pg);
     co_await interruptor::make_interruptible(
-      shard_services.get_store().do_transaction(coll, std::move(t)));
+      crimson::os::with_store_do_transaction(
+        shard_services.get_store(pg.get_store_index()),
+        coll, std::move(t)));
   } else {
     response->soid = push_op.soid;
     response->recovery_info = pull_info.recovery_info;
     response->recovery_progress = pull_info.recovery_progress;
     DEBUGDPP("submitting transaction, incomplete", pg);
     co_await interruptor::make_interruptible(
-      shard_services.get_store().do_transaction(coll, std::move(t)));
+      crimson::os::with_store_do_transaction(
+        shard_services.get_store(pg.get_store_index()),
+        coll, std::move(t)));
   }
 
   co_return complete;
@@ -1039,14 +1050,17 @@ ReplicatedRecoveryBackend::handle_push(
       false, t);
 
     co_await interruptor::make_interruptible(
-      shard_services.get_store().do_transaction(coll, std::move(t)));
+      crimson::os::with_store_do_transaction(
+        shard_services.get_store(pg.get_store_index()),
+        coll, std::move(t)));
     replica_push_targets.erase(ptiter);
 
     pg.get_recovery_handler()->_committed_pushed_object(
       epoch_frozen, pg.get_info().last_complete);
   } else {
     co_await interruptor::make_interruptible(
-      shard_services.get_store().do_transaction(coll, std::move(t)));
+      crimson::os::with_store_do_transaction(
+        shard_services.get_store(pg.get_store_index()), coll, std::move(t)));
   }
 
   auto reply = crimson::make_message<MOSDPGPushReply>();
@@ -1222,7 +1236,8 @@ ReplicatedRecoveryBackend::prep_push_target(
 
   // clone overlap content in local object if using a new object
   auto st = co_await interruptor::make_interruptible(
-    store->stat(coll, ghobject_t(recovery_info.soid)));
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>(
+      store, coll, ghobject_t(recovery_info.soid), 0));
 
   // TODO: pg num bytes counting
   uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
index 30e378b2347b6c1e40a9258b331aa72e76d55034..1a149948cb0ca17ca8ae0e9edb5397e5ea97ed3d 100644 (file)
@@ -23,7 +23,7 @@ public:
                            crimson::osd::ShardServices& shard_services,
                            crimson::os::CollectionRef coll,
                            PGBackend* backend)
-    : RecoveryBackend(pg, shard_services, coll, backend)
+    : RecoveryBackend(pg, shard_services, coll, pg.get_store_index(), backend)
   {}
   interruptible_future<> handle_recovery_op(
     Ref<MOSDFastDispatchOp> m,
index 7524228c0e70182028dedb468e097a0ff263c715..281364952fd79eb11d4e6d71ade8fe5d766b911f 100644 (file)
@@ -1093,7 +1093,7 @@ void PGLog::rebuild_missing_set_with_deletes(
 
 namespace {
   struct FuturizedShardStoreLogReader {
-    crimson::os::FuturizedStore::Shard &store;
+    crimson::os::FuturizedStore::StoreShardRef store;
     const pg_info_t &info;
     PGLog::IndexedLog &log;
     std::set<std::string>* log_keys_debug = NULL;
@@ -1175,8 +1175,9 @@ namespace {
         return ObjectStore::omap_iter_ret_t::NEXT;
       };
 
-      co_await store.omap_iterate(
-        ch, pgmeta_oid, start_from, callback
+      co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+        store,
+        ch, pgmeta_oid, start_from, callback, 0
       ).safe_then([] (auto ret) {
         ceph_assert (ret == ObjectStore::omap_iter_ret_t::NEXT);
       }).handle_error(
@@ -1200,7 +1201,7 @@ namespace {
 }
 
 seastar::future<> PGLog::read_log_and_missing_crimson(
-  crimson::os::FuturizedStore::Shard &store,
+  crimson::os::FuturizedStore::StoreShardRef store,
   crimson::os::CollectionRef ch,
   const pg_info_t &info,
   IndexedLog &log,
index 1161d0901a9e3a3baca664062b30bdcf4e51dfcb..705ef03150f7cfd2f4c08c7c01924481348b94f7 100644 (file)
@@ -1850,7 +1850,7 @@ public:
 
 #ifdef WITH_CRIMSON
   seastar::future<> read_log_and_missing_crimson(
-    crimson::os::FuturizedStore::Shard &store,
+    crimson::os::FuturizedStore::StoreShardRef store,
     crimson::os::CollectionRef ch,
     const pg_info_t &info,
     ghobject_t pgmeta_oid
@@ -1862,7 +1862,7 @@ public:
   }
 
   static seastar::future<> read_log_and_missing_crimson(
-    crimson::os::FuturizedStore::Shard &store,
+    crimson::os::FuturizedStore::StoreShardRef store,
     crimson::os::CollectionRef ch,
     const pg_info_t &info,
     IndexedLog &log,
index 1668c9ff196550a94dc8e5932ef27677a9a0ea1c..a865a142eae8d9e07eb272dfeb39e4df1d847764 100644 (file)
@@ -115,8 +115,10 @@ int OSDriver::get_keys(
   LOG_PREFIX("OSDriver::get_keys");
   DEBUG("");
   using crimson::os::FuturizedStore;
-  return interruptor::green_get(os->omap_get_values(
-    ch, hoid, keys
+  return interruptor::green_get(
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+    os,
+    ch, hoid, keys, 0
   ).safe_then([out] (FuturizedStore::Shard::omap_values_t&& vals) {
     // just the difference in comparator (`std::less<>` in omap_values_t`)
     reinterpret_cast<FuturizedStore::Shard::omap_values_t&>(*out) = std::move(vals);
@@ -157,7 +159,8 @@ int OSDriver::get_next(
     }
   };
   return interruptor::green_get(
-    os->omap_iterate(ch, hoid, start_from, callback
+    crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+      os, ch, hoid, start_from, callback, 0
     ).safe_then([FNAME, key] (auto ret) {
       if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
         DEBUG("key {} no more values", key);
@@ -180,8 +183,10 @@ int OSDriver::get_next_or_current(
   DEBUG("key {}", key);
   using crimson::os::FuturizedStore;
   // let's try to get current first
-  return interruptor::green_get(os->omap_get_values(
-    ch, hoid, FuturizedStore::Shard::omap_keys_t{key}
+  return interruptor::green_get(crimson::os::with_store<
+    &crimson::os::FuturizedStore::Shard::omap_get_values>(
+    os,
+    ch, hoid, FuturizedStore::Shard::omap_keys_t{key}, 0
   ).safe_then([FNAME, &key, next_or_current] (FuturizedStore::Shard::omap_values_t&& vals) {
     DEBUG("returning {}", key);
     ceph_assert(vals.size() == 1);
index 83704cd9e8a16c8a9d969d20a850e09489f119cc..3adbb95dab811f7dc19b316fad64469dfe13e7ba 100644 (file)
@@ -39,12 +39,17 @@ class OSDriver : public MapCacher::StoreDriver<std::string, ceph::buffer::list>
 #ifdef WITH_CRIMSON
   using ObjectStoreT = crimson::os::FuturizedStore::Shard;
   using CollectionHandleT = ObjectStoreT::CollectionRef;
+
+  using ObjectStoreTLRef = seastar::shared_ptr<ObjectStoreT>;
+  using ObjectStoreTFRef = seastar::foreign_ptr<ObjectStoreTLRef>;
+  using ObjectStoreTRef = ::crimson::local_shared_foreign_ptr<ObjectStoreTLRef>;
 #else
   using ObjectStoreT = ObjectStore;
   using CollectionHandleT = ObjectStoreT::CollectionHandle;
+  using ObjectStoreTRef = ObjectStoreT*;
 #endif
 
-  ObjectStoreT *os;
+  ObjectStoreTRef os;
   CollectionHandleT ch;
   ghobject_t hoid;
 
@@ -80,10 +85,10 @@ public:
   }
 
 #ifndef WITH_CRIMSON
-  OSDriver(ObjectStoreT *os, const coll_t& cid, const ghobject_t &hoid) :
+  OSDriver(ObjectStoreTRef os, const coll_t& cid, const ghobject_t &hoid) :
     OSDriver(os, os->open_collection(cid), hoid) {}
 #endif
-  OSDriver(ObjectStoreT *os, CollectionHandleT ch, const ghobject_t &hoid) :
+  OSDriver(ObjectStoreTRef os, CollectionHandleT ch, const ghobject_t &hoid) :
     os(os),
     ch(ch),
     hoid(hoid) {}