]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: Introduce ObjectContextLoader
authorMatan Breizman <mbreizma@redhat.com>
Mon, 31 Oct 2022 09:12:38 +0000 (09:12 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Tue, 22 Nov 2022 09:35:34 +0000 (09:35 +0000)
* obc load logic is moved to a seperate class for better reusability.

* with_existing_* logic is removed. (not being used)

Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/object_context_loader.cc [new file with mode: 0644]
src/crimson/osd/object_context_loader.h [new file with mode: 0644]
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/replicated_recovery_backend.cc

index 52cc33e3f954b948b5975b3a2ca5082b2e1b7642..f899b064ad63693b9b4eea9335626cad5480da5c 100644 (file)
@@ -12,6 +12,7 @@ add_executable(crimson-osd
   shard_services.cc
   pg_shard_manager.cc
   object_context.cc
+  object_context_loader.cc
   ops_executer.cc
   osd_operation.cc
   osd_operations/client_request.cc
diff --git a/src/crimson/osd/object_context_loader.cc b/src/crimson/osd/object_context_loader.cc
new file mode 100644 (file)
index 0000000..246c20f
--- /dev/null
@@ -0,0 +1,190 @@
+#include "crimson/osd/object_context_loader.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace crimson::osd {
+
+using crimson::common::local_conf;
+
+  template<RWState::State State>
+  ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_head_obc(ObjectContextRef obc,
+                                     bool existed,
+                                     with_obc_func_t&& func)
+  {
+    logger().debug("{} {}", __func__, obc->get_oid());
+    assert(obc->is_head());
+    obc->append_to(obc_set_accessing);
+    return obc->with_lock<State, IOInterruptCondition>(
+      [existed=existed, obc=obc, func=std::move(func), this] {
+      return get_or_load_obc<State>(obc, existed)
+      .safe_then_interruptible(
+        [func = std::move(func)](auto obc) {
+        return std::move(func)(std::move(obc));
+      });
+    }).finally([this, obc=std::move(obc)] {
+      logger().debug("with_head_obc: released {}", obc->get_oid());
+      obc->remove_from(obc_set_accessing);
+    });
+  }
+
+  template<RWState::State State>
+  ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_clone_obc(hobject_t oid,
+                                      with_obc_func_t&& func)
+  {
+    assert(!oid.is_head());
+    return with_head_obc<RWState::RWREAD>(oid.get_head(),
+      [oid, func=std::move(func), this](auto head)
+      -> load_obc_iertr::future<> {
+      if (!head->obs.exists) {
+        logger().error("with_clone_obc: {} head doesn't exist",
+                       head->obs.oi.soid);
+        return load_obc_iertr::future<>{
+          crimson::ct_error::enoent::make()
+        };
+      }
+      auto coid = resolve_oid(head->get_ro_ss(), oid);
+      if (!coid) {
+        logger().error("with_clone_obc: {} clone not found", coid);
+        return load_obc_iertr::future<>{
+          crimson::ct_error::enoent::make()
+        };
+      }
+      auto [clone, existed] = shard_services.get_cached_obc(*coid);
+      return clone->template with_lock<State, IOInterruptCondition>(
+        [existed=existed, head=std::move(head), clone=std::move(clone),
+         func=std::move(func), this]() -> load_obc_iertr::future<> {
+        auto loaded = get_or_load_obc<State>(clone, existed);
+        clone->head = head;
+        return loaded.safe_then_interruptible(
+          [func = std::move(func)](auto clone) {
+          return std::move(func)(std::move(clone));
+        });
+      });
+    });
+  }
+
+  template<RWState::State State>
+  ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_head_obc(hobject_t oid,
+                                     with_obc_func_t&& func)
+  {
+    auto [obc, existed] =
+      shard_services.get_cached_obc(std::move(oid));
+    return with_head_obc<State>(std::move(obc),
+                                existed,
+                                std::move(func));
+  }
+
+  ObjectContextLoader::load_obc_iertr::future<ObjectContextRef>
+  ObjectContextLoader::load_obc(ObjectContextRef obc)
+  {
+    return backend->load_metadata(obc->get_oid())
+    .safe_then_interruptible(
+      [obc=std::move(obc)](auto md)
+      -> load_obc_ertr::future<ObjectContextRef> {
+      const hobject_t& oid = md->os.oi.soid;
+      logger().debug(
+        "load_obc: loaded obs {} for {}", md->os.oi, oid);
+      if (oid.is_head()) {
+        if (!md->ssc) {
+          logger().error(
+            "load_obc: oid {} missing snapsetcontext", oid);
+          return crimson::ct_error::object_corrupted::make();
+        }
+        obc->set_head_state(std::move(md->os),
+                            std::move(md->ssc));
+      } else {
+        obc->set_clone_state(std::move(md->os));
+      }
+      logger().debug(
+        "load_obc: returning obc {} for {}",
+        obc->obs.oi, obc->obs.oi.soid);
+      return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+    });
+  }
+
+  template<RWState::State State>
+  ObjectContextLoader::load_obc_iertr::future<ObjectContextRef>
+  ObjectContextLoader::get_or_load_obc(ObjectContextRef obc,
+                                       bool existed)
+  {
+    auto loaded =
+      load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
+    if (existed) {
+      logger().debug("{}: found {} in cache",
+                     __func__, obc->get_oid());
+    } else {
+      logger().debug("{}: cache miss on {}",
+                     __func__, obc->get_oid());
+      loaded =
+        obc->template with_promoted_lock<State, IOInterruptCondition>(
+        [obc, this] {
+        return load_obc(obc);
+      });
+    }
+    return loaded;
+  }
+
+  ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::reload_obc(ObjectContext& obc) const
+  {
+    assert(obc.is_head());
+    return backend->load_metadata(obc.get_oid())
+    .safe_then_interruptible<false>(
+      [&obc](auto md)-> load_obc_ertr::future<> {
+      logger().debug(
+        "{}: reloaded obs {} for {}",
+        __func__,
+        md->os.oi,
+        obc.get_oid());
+      if (!md->ssc) {
+        logger().error(
+          "{}: oid {} missing snapsetcontext",
+          __func__,
+          obc.get_oid());
+        return crimson::ct_error::object_corrupted::make();
+      }
+      obc.set_head_state(std::move(md->os), std::move(md->ssc));
+      return load_obc_ertr::now();
+    });
+  }
+
+  // explicitly instantiate the used instantiations
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_head_obc<RWState::RWNONE>(hobject_t,
+                                                      with_obc_func_t&&);
+
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_head_obc<RWState::RWREAD>(hobject_t,
+                                                      with_obc_func_t&&);
+
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_head_obc<RWState::RWWRITE>(hobject_t,
+                                                       with_obc_func_t&&);
+
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_head_obc<RWState::RWEXCL>(hobject_t,
+                                                      with_obc_func_t&&);
+
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_clone_obc<RWState::RWNONE>(hobject_t,
+                                                       with_obc_func_t&&);
+
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_clone_obc<RWState::RWREAD>(hobject_t,
+                                                       with_obc_func_t&&);
+
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_clone_obc<RWState::RWWRITE>(hobject_t,
+                                                        with_obc_func_t&&);
+
+  template ObjectContextLoader::load_obc_iertr::future<>
+  ObjectContextLoader::with_clone_obc<RWState::RWEXCL>(hobject_t,
+                                                       with_obc_func_t&&);
+}
diff --git a/src/crimson/osd/object_context_loader.h b/src/crimson/osd/object_context_loader.h
new file mode 100644 (file)
index 0000000..7771da3
--- /dev/null
@@ -0,0 +1,62 @@
+#pragma once
+
+#include <seastar/core/future.hh>
+#include "crimson/common/errorator.h"
+#include "crimson/osd/object_context.h"
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg_backend.h"
+
+namespace crimson::osd {
+class ObjectContextLoader {
+public:
+  using obc_accessing_list_t = boost::intrusive::list<
+    ObjectContext,
+    ObjectContext::obc_accessing_option_t>;
+
+  ObjectContextLoader(
+    ShardServices& _shard_services,
+    PGBackend* _backend)
+    : shard_services{_shard_services},
+      backend{_backend}
+    {}
+
+  using load_obc_ertr = crimson::errorator<
+    crimson::ct_error::enoent,
+    crimson::ct_error::object_corrupted>;
+  using load_obc_iertr =
+    ::crimson::interruptible::interruptible_errorator<
+      ::crimson::osd::IOInterruptCondition,
+      load_obc_ertr>;
+
+  using with_obc_func_t =
+    std::function<load_obc_iertr::future<> (ObjectContextRef)>;
+
+  template<RWState::State State>
+  load_obc_iertr::future<> with_head_obc(hobject_t oid,
+                                         with_obc_func_t&& func);
+
+  template<RWState::State State>
+  load_obc_iertr::future<> with_clone_obc(hobject_t oid,
+                                          with_obc_func_t&& func);
+
+  template<RWState::State State>
+  load_obc_iertr::future<> with_head_obc(ObjectContextRef obc,
+                                         bool existed,
+                                         with_obc_func_t&& func);
+
+  template<RWState::State State>
+  load_obc_iertr::future<ObjectContextRef>
+  get_or_load_obc(ObjectContextRef obc,
+                  bool existed);
+
+  load_obc_iertr::future<ObjectContextRef>
+  load_obc(ObjectContextRef obc);
+
+  load_obc_iertr::future<> reload_obc(ObjectContext& obc) const;
+
+private:
+  ShardServices &shard_services;
+  PGBackend* backend;
+  obc_accessing_list_t obc_set_accessing;
+};
+}
index 6a386bcacdc7c8b1ea7ca005a7f240ca9842cd87..b8420ef3f4e01ee9a2c3cc4bf59d9a332927bb9a 100644 (file)
@@ -133,6 +133,9 @@ PG::PG(
       osdmap,
       this,
       this),
+    obc_loader{
+      shard_services,
+      backend.get()},
     wait_for_active_blocker(this)
 {
   peering_state.set_backend_predicates(
@@ -632,7 +635,7 @@ PG::do_osd_ops_execute(
 {
   assert(ox);
   auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
-    return reload_obc(obc).handle_error_interruptible(
+    return obc_loader.reload_obc(obc).handle_error_interruptible(
       load_obc_ertr::assert_all{"can't live with object state messed up"});
   });
   auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
@@ -1030,163 +1033,6 @@ std::optional<hobject_t> PG::resolve_oid(
   }
 }
 
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(ObjectContextRef obc, bool existed, with_obc_func_t&& func)
-{
-  logger().debug("{} {}", __func__, obc->get_oid());
-  assert(obc->is_head());
-  obc->append_to(obc_set_accessing);
-  return obc->with_lock<State, IOInterruptCondition>(
-    [existed=existed, obc=obc, func=std::move(func), this] {
-    return get_or_load_obc<State>(obc, existed).safe_then_interruptible(
-      [func = std::move(func)](auto obc) {
-      return std::move(func)(std::move(obc));
-    });
-  }).finally([this, pgref=boost::intrusive_ptr<PG>{this}, obc=std::move(obc)] {
-    logger().debug("with_head_obc: released {}", obc->get_oid());
-    obc->remove_from(obc_set_accessing);
-  });
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
-{
-  auto [obc, existed] =
-    shard_services.get_cached_obc(std::move(oid));
-  return with_head_obc<State>(std::move(obc), existed, std::move(func));
-}
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_head_obc(ObjectContextRef obc, with_obc_func_t&& func)
-{
-  constexpr bool existed = true;
-  return with_head_obc<State>(
-    std::move(obc), existed, std::move(func)
-  ).handle_error_interruptible(load_obc_ertr::assert_all{"can't happen"});
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
-{
-  assert(!oid.is_head());
-  return with_head_obc<RWState::RWREAD>(oid.get_head(),
-    [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> {
-    if (!head->obs.exists) {
-      logger().error("with_clone_obc: {} head doesn't exist", head->obs.oi.soid);
-      return load_obc_iertr::future<>{crimson::ct_error::object_corrupted::make()};
-    }
-    auto coid = resolve_oid(head->get_ro_ss(), oid);
-    if (!coid) {
-      logger().error("with_clone_obc: {} clone not found", oid);
-      return load_obc_iertr::future<>{crimson::ct_error::enoent::make()};
-    }
-    auto [clone, existed] = shard_services.get_cached_obc(*coid);
-    return clone->template with_lock<State, IOInterruptCondition>(
-      [existed=existed, head=std::move(head), clone=std::move(clone),
-       func=std::move(func), this]() -> load_obc_iertr::future<> {
-      auto loaded = get_or_load_obc<State>(clone, existed);
-      clone->head = head;
-      return loaded.safe_then_interruptible([func = std::move(func)](auto clone) {
-        return std::move(func)(std::move(clone));
-      });
-    });
-  });
-}
-
-// explicitly instantiate the used instantiations
-template PG::load_obc_iertr::future<>
-PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_clone_obc(ObjectContextRef clone, with_obc_func_t&& func)
-{
-  assert(clone);
-  assert(clone->get_head_obc());
-  assert(!clone->get_oid().is_head());
-  return with_existing_head_obc<RWState::RWREAD>(clone->get_head_obc(),
-    [clone=std::move(clone), func=std::move(func)] ([[maybe_unused]] auto head) {
-    assert(head == clone->get_head_obc());
-    return clone->template with_lock<State>(
-      [clone=std::move(clone), func=std::move(func)] {
-      return std::move(func)(std::move(clone));
-    });
-  });
-}
-
-PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
-PG::load_obc(ObjectContextRef obc)
-{
-  return backend->load_metadata(obc->get_oid()).safe_then_interruptible(
-    [obc=std::move(obc)](auto md)
-    -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
-    const hobject_t& oid = md->os.oi.soid;
-    logger().debug(
-      "load_obc: loaded obs {} for {}", md->os.oi, oid);
-    if (oid.is_head()) {
-      if (!md->ssc) {
-        logger().error(
-          "load_obc: oid {} missing snapsetcontext", oid);
-        return crimson::ct_error::object_corrupted::make();
-      }
-      obc->set_head_state(std::move(md->os), std::move(md->ssc));
-    } else {
-      obc->set_clone_state(std::move(md->os));
-    }
-    logger().debug(
-      "load_obc: returning obc {} for {}",
-      obc->obs.oi, obc->obs.oi.soid);
-    return load_obc_ertr::make_ready_future<
-      crimson::osd::ObjectContextRef>(obc);
-  });
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
-PG::get_or_load_obc(
-    crimson::osd::ObjectContextRef obc,
-    bool existed)
-{
-  auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
-  if (existed) {
-    logger().debug("{}: found {} in cache", __func__, obc->get_oid());
-  } else {
-    logger().debug("{}: cache miss on {}", __func__, obc->get_oid());
-    loaded = obc->template with_promoted_lock<State, IOInterruptCondition>(
-      [obc, this] {
-      return load_obc(obc);
-    });
-  }
-  return loaded;
-}
-
-PG::load_obc_iertr::future<>
-PG::reload_obc(crimson::osd::ObjectContext& obc) const
-{
-  assert(obc.is_head());
-  return backend->load_metadata(obc.get_oid()).safe_then_interruptible<false>([&obc](auto md)
-    -> load_obc_ertr::future<> {
-    logger().debug(
-      "{}: reloaded obs {} for {}",
-      __func__,
-      md->os.oi,
-      obc.get_oid());
-    if (!md->ssc) {
-      logger().error(
-        "{}: oid {} missing snapsetcontext",
-        __func__,
-        obc.get_oid());
-      return crimson::ct_error::object_corrupted::make();
-    }
-    obc.set_head_state(std::move(md->os), std::move(md->ssc));
-    return load_obc_ertr::now();
-  });
-}
-
 PG::load_obc_iertr::future<>
 PG::with_locked_obc(const hobject_t &hobj,
                     const OpInfo &op_info,
@@ -1199,47 +1045,27 @@ PG::with_locked_obc(const hobject_t &hobj,
   switch (get_lock_type(op_info)) {
   case RWState::RWREAD:
     if (oid.is_head()) {
-      return with_head_obc<RWState::RWREAD>(oid, std::move(f));
+      return obc_loader.with_head_obc<RWState::RWREAD>(oid, std::move(f));
     } else {
-      return with_clone_obc<RWState::RWREAD>(oid, std::move(f));
+      return obc_loader.with_clone_obc<RWState::RWREAD>(oid, std::move(f));
     }
   case RWState::RWWRITE:
     if (oid.is_head()) {
-      return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+      return obc_loader.with_head_obc<RWState::RWWRITE>(oid, std::move(f));
     } else {
-      return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+      return obc_loader.with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
     }
   case RWState::RWEXCL:
     if (oid.is_head()) {
-      return with_head_obc<RWState::RWEXCL>(oid, std::move(f));
+      return obc_loader.with_head_obc<RWState::RWEXCL>(oid, std::move(f));
     } else {
-      return with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
+      return obc_loader.with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
     }
   default:
     ceph_abort();
   };
 }
 
-template <RWState::State State>
-PG::interruptible_future<>
-PG::with_locked_obc(ObjectContextRef obc, with_obc_func_t &&f)
-{
-  // TODO: a question from rebase: do we really need such checks when
-  // the interruptible stuff is being used?
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
-  if (obc->is_head()) {
-    return with_existing_head_obc<State>(obc, std::move(f));
-  } else {
-    return with_existing_clone_obc<State>(obc, std::move(f));
-  }
-}
-
-// explicitly instantiate the used instantiations
-template PG::interruptible_future<>
-PG::with_locked_obc<RWState::RWEXCL>(ObjectContextRef, with_obc_func_t&&);
-
 PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
 {
   if (__builtin_expect(stopping, false)) {
index 2bd230c285b42dda34a1eea1dd98f751a8b09070..000844aa271ad939941e4f9b1f01e13374c71313 100644 (file)
@@ -36,6 +36,7 @@
 #include "crimson/osd/pg_recovery.h"
 #include "crimson/osd/pg_recovery_listener.h"
 #include "crimson/osd/recovery_backend.h"
+#include "crimson/osd/object_context_loader.h"
 
 class MQuery;
 class OSDMap;
@@ -519,17 +520,6 @@ public:
   using interruptor = ::crimson::interruptible::interruptor<
     ::crimson::osd::IOInterruptCondition>;
 
-  template<RWState::State State>
-  load_obc_iertr::future<crimson::osd::ObjectContextRef>
-  get_or_load_obc(
-    crimson::osd::ObjectContextRef head_obc, bool existed);
-
-  load_obc_iertr::future<crimson::osd::ObjectContextRef>
-  load_obc(ObjectContextRef obc);
-
-  load_obc_iertr::future<>
-  reload_obc(crimson::osd::ObjectContext& obc) const;
-
 public:
   using with_obc_func_t =
     std::function<load_obc_iertr::future<> (ObjectContextRef)>;
@@ -539,13 +529,6 @@ public:
     ObjectContext::obc_accessing_option_t>;
   obc_accessing_list_t obc_set_accessing;
 
-  template<RWState::State State>
-  load_obc_iertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
-
-  template<RWState::State State>
-  interruptible_future<> with_locked_obc(
-    ObjectContextRef obc,
-    with_obc_func_t&& f);
   load_obc_iertr::future<> with_locked_obc(
     const hobject_t &hobj,
     const OpInfo &op_info,
@@ -569,27 +552,6 @@ public:
     eversion_t &version);
 
 private:
-  template<RWState::State State>
-  load_obc_iertr::future<> with_head_obc(
-    ObjectContextRef obc,
-    bool existed,
-    with_obc_func_t&& func);
-  template<RWState::State State>
-  interruptible_future<> with_existing_head_obc(
-    ObjectContextRef head,
-    with_obc_func_t&& func);
-
-  template<RWState::State State>
-  load_obc_iertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
-  template<RWState::State State>
-  interruptible_future<> with_existing_clone_obc(
-    ObjectContextRef clone, with_obc_func_t&& func);
-
-  load_obc_iertr::future<ObjectContextRef> get_locked_obc(
-    Operation *op,
-    const hobject_t &oid,
-    RWState::State type);
-
   void fill_op_params_bump_pg_version(
     osd_op_params_t& osd_op_p,
     const bool user_modify);
@@ -661,6 +623,8 @@ private:
   eversion_t projected_last_update;
 
 public:
+  ObjectContextLoader obc_loader;
+
   // PeeringListener
   void publish_stats_to_osd() final;
   void clear_publish_stats() final;
index 95a649d4182e54acc60a244847387864e52c3c66..6297ca87ba4ad7ec0c0a3d739e816243bc9cb341 100644 (file)
@@ -33,7 +33,7 @@ ReplicatedRecoveryBackend::recover_object(
   // start tracking the recovery of soid
   return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
     logger().debug("recover_object: loading obc: {}", soid);
-    return pg.with_head_obc<RWState::RWREAD>(soid,
+    return pg.obc_loader.with_head_obc<RWState::RWREAD>(soid,
       [this, soid, need](auto obc) {
       logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
       auto& recovery_waiter = get_recovering(soid);
@@ -678,7 +678,7 @@ ReplicatedRecoveryBackend::_handle_pull_response(
   auto prepare_waiter = interruptor::make_interruptible(
       seastar::make_ready_future<>());
   if (pi.recovery_progress.first) {
-    prepare_waiter = pg.with_head_obc<RWState::RWNONE>(
+    prepare_waiter = pg.obc_loader.with_head_obc<RWState::RWNONE>(
       pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) {
         pi.obc = obc;
         recovery_waiter.obc = obc;