return get_or_load_obc<State>(obc, existed)
.safe_then_interruptible(
[func = std::move(func)](auto obc) {
- return std::move(func)(std::move(obc));
+ return std::move(func)(obc, obc);
});
}).finally([FNAME, this, obc=std::move(obc)] {
DEBUGDPP("released object {}", dpp, obc->get_oid());
assert(!oid.is_head());
return with_obc<RWState::RWREAD>(
oid.get_head(),
- [FNAME, oid, func=std::move(func), this](auto head) mutable
+ [FNAME, oid, func=std::move(func), this](auto head, auto) mutable
-> load_obc_iertr::future<> {
if (!head->obs.exists) {
ERRORDPP("head doesn't exist for object {}", dpp, head->obs.oi.soid);
auto [clone, existed] = obc_registry.get_cached_obc(*coid);
return clone->template with_lock<State, IOInterruptCondition>(
[existed=existed, clone=std::move(clone),
- func=std::move(func), head=std::move(head), this]()
+ func=std::move(func), head=std::move(head), this]() mutable
-> load_obc_iertr::future<> {
auto loaded = get_or_load_obc<State>(clone, existed);
return loaded.safe_then_interruptible(
- [func = std::move(func)](auto clone) {
- return std::move(func)(std::move(clone));
+ [func = std::move(func), head=std::move(head)](auto clone) mutable {
+ return std::move(func)(std::move(head), std::move(clone));
});
});
}
ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_clone_obc_direct(
hobject_t oid,
- with_both_obc_func_t&& func)
+ with_obc_func_t&& func)
{
LOG_PREFIX(ObjectContextLoader::with_clone_obc_direct);
assert(!oid.is_head());
return with_obc<RWState::RWREAD>(
oid.get_head(),
- [FNAME, oid, func=std::move(func), this](auto head) mutable
+ [FNAME, oid, func=std::move(func), this](auto head, auto) mutable
-> load_obc_iertr::future<> {
if (!head->obs.exists) {
ERRORDPP("head doesn't exist for object {}", dpp, head->obs.oi.soid);
template ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_clone_obc_direct<RWState::RWWRITE>(
hobject_t,
- with_both_obc_func_t&&);
+ with_obc_func_t&&);
}
load_obc_ertr>;
using with_obc_func_t =
- std::function<load_obc_iertr::future<> (ObjectContextRef)>;
-
- using with_both_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;
// Use this variant by default
template<RWState::State State>
load_obc_iertr::future<> with_clone_obc_direct(
hobject_t oid,
- with_both_obc_func_t&& func);
+ with_obc_func_t&& func);
load_obc_iertr::future<> reload_obc(ObjectContext& obc) const;
ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
{
return ihref.enter_stage<interruptor>(
- client_pp(*pg).recover_missing,
- *this
- ).then_interruptible(
- [this, pg]() mutable {
- LOG_PREFIX(ClientRequest::process_op);
- if (pg->is_primary()) {
- return do_recover_missing(pg, m->get_hobj());
- } else {
- DEBUGI("process_op: Skipping do_recover_missing"
- "on non primary pg");
- return interruptor::now();
- }
+ client_pp(*pg).recover_missing, *this
+ ).then_interruptible([pg, this]() mutable {
+ return recover_missings(pg, m->get_hobj(), snaps_need_to_recover());
}).then_interruptible([this, pg, &ihref]() mutable {
return pg->already_complete(m->get_reqid()).then_interruptible(
[this, pg, &ihref](auto completed) mutable
op_info.set_from_op(&*m, *pg->get_osdmap());
return pg->with_locked_obc(
m->get_hobj(), op_info,
- [this, pg, &ihref](auto obc) mutable {
+ [this, pg, &ihref](auto head, auto obc) mutable {
LOG_PREFIX(ClientRequest::process_op);
DEBUGI("{}: got obc {}", *this, obc->obs);
return ihref.enter_stage<interruptor>(
}
auto get_instance_handle() { return instance_handle; }
+ std::vector<snapid_t> snaps_need_to_recover() {
+ std::vector<snapid_t> ret;
+ for (auto &op : m->ops) {
+ if (op.op.op == CEPH_OSD_OP_ROLLBACK) {
+ ret.emplace_back((snapid_t)op.op.snap.snapid);
+ }
+ }
+ return ret;
+ }
+
using ordering_hook_t = boost::intrusive::list_member_hook<>;
ordering_hook_t ordering_hook;
class Orderer {
}
}
+SET_SUBSYS(osd);
+
namespace crimson::osd {
+InterruptibleOperation::template interruptible_future<>
+CommonClientRequest::recover_missings(
+ Ref<PG> &pg,
+ const hobject_t& soid,
+ std::vector<snapid_t> &&snaps)
+{
+ using interruptor = InterruptibleOperation::interruptor;
+ LOG_PREFIX(CommonClientRequest::recover_missings);
+ auto fut = interruptor::now();
+ if (!pg->is_primary()) {
+ DEBUGI("process_op: Skipping do_recover_missing on non primary pg");
+ return fut;
+ }
+ if (!soid.is_head()) {
+ fut = do_recover_missing(pg, soid.get_head());
+ }
+ return seastar::do_with(
+ std::move(snaps),
+ [pg, soid, fut=std::move(fut)](auto &snaps) mutable {
+ return fut.then_interruptible([&snaps, pg, soid]() mutable {
+ return pg->obc_loader.with_obc<RWState::RWREAD>(
+ soid.get_head(),
+ [&snaps, pg, soid](auto head, auto) mutable {
+ auto oid = resolve_oid(head->get_head_ss(), soid);
+ assert(oid);
+ return do_recover_missing(pg, *oid
+ ).then_interruptible([&snaps, pg, soid, head]() mutable {
+ return InterruptibleOperation::interruptor::do_for_each(
+ snaps,
+ [pg, soid, head](auto &snap) mutable {
+ auto coid = head->obs.oi.soid;
+ coid.snap = snap;
+ auto oid = resolve_oid(head->get_head_ss(), coid);
+ assert(oid);
+ return do_recover_missing(pg, *oid);
+ });
+ });
+ });
+ }).handle_error_interruptible(
+ crimson::ct_error::assert_all("unexpected error")
+ );
+ });
+}
+
typename InterruptibleOperation::template interruptible_future<>
CommonClientRequest::do_recover_missing(
Ref<PG>& pg, const hobject_t& soid)
namespace crimson::osd {
struct CommonClientRequest {
+
+ static InterruptibleOperation::template interruptible_future<>
+ recover_missings(
+ Ref<PG> &pg,
+ const hobject_t& soid,
+ std::vector<snapid_t> &&snaps);
+
static InterruptibleOperation::template interruptible_future<>
do_recover_missing(Ref<PG>& pg, const hobject_t& soid);
} wait_for_active;
struct RecoverMissing : OrderedExclusivePhaseT<RecoverMissing> {
static constexpr auto type_name = "CommonPGPipeline::recover_missing";
- } recover_missing;
+ } recover_missing, recover_missing2;
struct GetOBC : OrderedExclusivePhaseT<GetOBC> {
static constexpr auto type_name = "CommonPGPipeline::get_obc";
} get_obc;
std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
assert(ret == 0);
return pg->with_locked_obc(get_target_oid(), op_info,
- [&osd_ops, this](auto obc) {
+ [&osd_ops, this](auto, auto obc) {
return enter_stage<interruptor>(client_pp().process
).then_interruptible(
[obc=std::move(obc), &osd_ops, this] {
throw crimson::common::system_shutdown_exception();
}
const hobject_t oid = get_oid(hobj);
- auto wrapper = [f=std::move(f), this](auto obc) {
+ auto wrapper = [f=std::move(f), this](auto head, auto obc) {
check_blocklisted_obc_watchers(obc);
- return f(obc);
+ return f(head, obc);
};
switch (get_lock_type(op_info)) {
case RWState::RWREAD:
public:
using with_obc_func_t =
- std::function<load_obc_iertr::future<> (ObjectContextRef)>;
+ std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;
load_obc_iertr::future<> with_locked_obc(
const hobject_t &hobj,
return obc_loader.with_clone_obc_only<RWState::RWWRITE>(
head, target_coid,
[this, &os, &txn, &delta_stats, &osd_op_params]
- (auto resolved_obc) {
+ (auto, auto resolved_obc) {
if (resolved_obc->obs.oi.soid.is_head()) {
// no-op: The resolved oid returned the head object
logger().debug("PGBackend::rollback: loaded head_obc: {}"
return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
logger().debug("recover_object: loading obc: {}", soid);
return pg.obc_loader.with_obc<RWState::RWREAD>(soid,
- [this, soid, need](auto obc) {
+ [this, soid, need](auto, auto obc) {
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
auto& recovery_waiter = get_recovering(soid);
recovery_waiter.obc = obc;
if (pull_info.recovery_progress.first) {
prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
pull_info.recovery_info.soid,
- [&pull_info, &recovery_waiter, &push_op](auto obc) {
+ [&pull_info, &recovery_waiter, &push_op](auto, auto obc) {
pull_info.obc = obc;
recovery_waiter.obc = obc;
obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR), push_op.soid);