#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
+#include <seastar/core/shared_ptr.hh>
#include "common/dout.h"
#include "common/static_ptr.h"
class PG;
// OpsExecuter -- a class for executing ops targeting a certain object.
-class OpsExecuter {
+class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
using call_errorator = crimson::errorator<
crimson::stateful_ec,
crimson::ct_error::enoent,
}
template <class Func>
- struct RollbackHelper {
- interruptible_future<> rollback_obc_if_modified(const std::error_code& e);
- ObjectContextRef get_obc() const {
- return ox.obc;
- }
- OpsExecuter& ox;
- Func func;
- };
+ struct RollbackHelper;
template <class Func>
- RollbackHelper<Func> create_rollbacker(Func&& func) {
- return {*this, std::forward<Func>(func)};
- }
+ RollbackHelper<Func> create_rollbacker(Func&& func);
interruptible_errorated_future<osd_op_errorator>
execute_op(OSDOp& osd_op);
}
}
+template <class Func>
+struct OpsExecuter::RollbackHelper {
+ interruptible_future<> rollback_obc_if_modified(const std::error_code& e);
+ ObjectContextRef get_obc() const {
+ assert(ox);
+ return ox->obc;
+ }
+ seastar::lw_shared_ptr<OpsExecuter> ox;
+ Func func;
+};
+
+template <class Func>
+inline OpsExecuter::RollbackHelper<Func>
+OpsExecuter::create_rollbacker(Func&& func) {
+ return {shared_from_this(), std::forward<Func>(func)};
+}
+
+
template <class Func>
OpsExecuter::interruptible_future<>
OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
// typically append them before any write. If OpsExecuter hasn't
// seen any modifying operation, `obc` is supposed to be kept
// unchanged.
- const auto need_rollback = ox.has_seen_write();
+ assert(ox);
+ const auto need_rollback = ox->has_seen_write();
crimson::get_logger(ceph_subsys_osd).debug(
"{}: object {} got error {}, need_rollback={}",
__func__,
- ox.obc->get_oid(),
+ ox->obc->get_oid(),
e,
need_rollback);
- return need_rollback ? func(*ox.obc) : interruptor::now();
+ return need_rollback ? func(*ox->obc) : interruptor::now();
}
// PgOpsExecuter -- a class for executing ops targeting a certain PG.
template <class Ret, class SuccessFunc, class FailureFunc>
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
PG::do_osd_ops_execute(
- OpsExecuter&& ox,
+ seastar::lw_shared_ptr<OpsExecuter> ox,
std::vector<OSDOp>& ops,
const OpInfo &op_info,
SuccessFunc&& success_func,
FailureFunc&& failure_func)
{
- auto rollbacker = ox.create_rollbacker([this] (auto& obc) {
+ assert(ox);
+ auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
return reload_obc(obc).handle_error_interruptible(
load_obc_ertr::assert_all{"can't live with object state messed up"});
});
auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
- return interruptor::do_for_each(ops, [&ox](OSDOp& osd_op) {
+ return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
logger().debug(
"do_osd_ops_execute: object {} - handling op {}",
- ox.get_target(),
+ ox->get_target(),
ceph_osd_op_name(osd_op.op.op));
- return ox.execute_op(osd_op);
- }).safe_then_interruptible([this, &ox, &op_info, &ops] {
+ return ox->execute_op(osd_op);
+ }).safe_then_interruptible([this, ox, &op_info, &ops] {
logger().debug(
"do_osd_ops_execute: object {} all operations successful",
- ox.get_target());
- return std::move(ox).flush_changes_n_do_ops_effects(
+ ox->get_target());
+ return std::move(*ox).flush_changes_n_do_ops_effects(
Ref<PG>{this},
[this, &op_info, &ops] (auto&& txn,
auto&& obc,
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
- auto ox = std::make_unique<OpsExecuter>(
- std::move(obc), op_info, get_pool().info, get_backend(), *m);
return do_osd_ops_execute<Ref<MOSDOpReply>>(
- std::move(*ox), m->ops, op_info,
+ seastar::make_lw_shared<OpsExecuter>(
+ std::move(obc), op_info, get_pool().info, get_backend(), *m),
+ m->ops,
+ op_info,
[this, m, rvec = op_info.allows_returnvec()] {
// TODO: should stop at the first op which returns a negative retval,
// cmpext uses it for returning the index of first unmatched byte
peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }
- ).safe_then_unpack_interruptible(
- [ox_deleter=std::move(ox)](auto submitted, auto all_completed) mutable {
- return do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ref<MOSDOpReply>>>(
- std::move(submitted),
- all_completed.finally([ox_deleter=std::move(ox_deleter)] {}));
- });
+ });
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
do_osd_ops_success_func_t success_func,
do_osd_ops_failure_func_t failure_func)
{
- auto ox = std::make_unique<OpsExecuter>(
- std::move(obc), op_info, get_pool().info, get_backend(), msg_params);
return do_osd_ops_execute<void>(
- std::move(*ox),
+ seastar::make_lw_shared<OpsExecuter>(
+ std::move(obc), op_info, get_pool().info, get_backend(), msg_params),
ops,
std::as_const(op_info),
std::move(success_func),
- std::move(failure_func)
- ).safe_then_unpack_interruptible(
- [ox_deleter=std::move(ox)](auto submitted, auto all_completed) mutable {
- return do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<>>(
- std::move(submitted),
- all_completed.finally([ox_deleter=std::move(ox_deleter)] {}));
- });
+ std::move(failure_func));
}
PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)