msg(std::in_place_type_t<ExecutableMessagePimpl<MsgT>>{}, &msg) {
}
+ template <class Func>
+ struct RollbackHelper {
+ interruptible_future<> rollback_obc_if_modified(const std::error_code& e);
+ OpsExecuter& ox;
+ Func func;
+ };
+
+ template <class Func>
+ RollbackHelper<Func> create_rollbacker(Func&& func) {
+ return {*this, std::forward<Func>(func)};
+ }
+
interruptible_errorated_future<osd_op_errorator>
execute_op(class OSDOp& osd_op);
}
}
+template <class Func>
+OpsExecuter::interruptible_future<>
+OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
+ const std::error_code& e)
+{
+ // Oops, an operation had failed. do_osd_ops() altogether with
+ // OpsExecuter already dropped the ObjectStore::Transaction if
+ // there was any. However, this is not enough to completely
+ // rollback as we gave OpsExecuter the very single copy of `obc`
+ // we maintain and we did it for both reading and writing.
+ // Now all modifications must be reverted.
+ //
+ // Let's just reload from the store. Evicting from the shared
+ // LRU would be tricky as next MOSDOp (the one at `get_obc`
+ // phase) could actually already finished the lookup. Fortunately,
+ // this is supposed to live on cold paths, so performance is not
+ // a concern -- simplicity wins.
+ //
+ // The conditional's purpose is to efficiently handle hot errors
+ // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
+ // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
+ // typically append them before any write. If OpsExecuter hasn't
+ // seen any modifying operation, `obc` is supposed to be kept
+ // unchanged.
+ const auto need_rollback = ox.has_seen_write();
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "{}: object {} got error {}, need_rollback={}",
+ __func__,
+ ox.obc->get_oid(),
+ e,
+ need_rollback);
+ return need_rollback ? func(*ox.obc) : interruptor::now();
+}
+
// PgOpsExecuter -- a class for executing ops targeting a certain PG.
class PgOpsExecuter {
template <typename T = void>
SuccessFunc&& success_func,
FailureFunc&& failure_func)
{
+ auto rollbacker = ox.create_rollbacker([this] (auto& obc) {
+ return reload_obc(obc).handle_error_interruptible(
+ load_obc_ertr::assert_all{"can't live with object state messed up"});
+ });
return interruptor::do_for_each(ops, [m, &ox](OSDOp& osd_op) {
logger().debug(
"do_osd_ops_execute: {} - object {} - handling op {}",
return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
});
}), OpsExecuter::osd_op_errorator::all_same_way(
- [&ox, m, obc, failure_func=std::move(failure_func), this] (const std::error_code& e) mutable {
- const bool need_reload_obc = ox.has_seen_write();
- logger().debug(
- "do_osd_ops_execute: {} - object {} got error {}, {}; need_reload_obc {}",
- m,
- obc->obs.oi.soid,
- e.value(),
- e.message(),
- need_reload_obc);
- return (
- need_reload_obc ? reload_obc(*obc)
- : interruptor::make_interruptible(load_obc_ertr::now())
- ).safe_then_interruptible([&e, failure_func=std::move(failure_func)] {
+ [rollbacker, failure_func=std::move(failure_func), m]
+ (const std::error_code& e) mutable {
+ return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [&e, failure_func=std::move(failure_func)] {
return std::move(failure_func)(e);
- }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
+ });
}));
}