From: Radoslaw Zarzynski Date: Mon, 29 Mar 2021 17:03:19 +0000 (+0000) Subject: crimson/osd: split PG::do_osd_ops() to facilitate InternalClientRequest. X-Git-Tag: v17.1.0~1984^2~17 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f96a7f0acf1cac1dacbb476ea7e61c75c512f718;p=ceph.git crimson/osd: split PG::do_osd_ops() to facilitate InternalClientRequest. This commit brings `PG::do_osd_ops_execute()` a subset of `PG::do_osd_ops()`; it handles the ops execution through `OpsExecuter` and the `submit_transaction()` but it stays indepedent from `MOSDOp` and `MOSDOpReply`. This trait facilitates the `InternalClientRequest`. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index ced84b9e8e09f..8a2f92cdcaa1f 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -28,11 +28,10 @@ #include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/shard_services.h" -class PG; -class PGLSFilter; class OSDOp; namespace crimson::osd { +class PG; // OpsExecuter -- a class for executing ops targeting a certain object. class OpsExecuter { diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 44ebcd3c8829d..23f8e55b6a478 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -698,43 +698,36 @@ PG::interruptible_future<> PG::repair_object( return std::move(fut); } -PG::do_osd_ops_iertr::future> -PG::do_osd_ops( +template +PG::do_osd_ops_iertr::future PG::do_osd_ops_execute( + OpsExecuter&& ox, + std::vector ops, Ref m, ObjectContextRef obc, - const OpInfo &op_info) + const OpInfo &op_info, + SuccessFunc&& success_func, + FailureFunc&& failure_func) { - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - - using osd_op_ierrorator = OpsExecuter::osd_op_ierrorator; - using osd_op_errorator = OpsExecuter::osd_op_errorator; - const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head() - : m->get_hobj(); - auto ox = std::make_unique( - obc, op_info, get_pool().info, get_backend(), *m); - return interruptor::do_for_each( - m->ops.begin(), m->ops.end(), [m, ox = ox.get()](OSDOp& osd_op) { + return interruptor::do_for_each(ops, [m, &ox](OSDOp& osd_op) { logger().debug( - "do_osd_ops: {} - object {} - handling op {}", + "do_osd_ops_execute: {} - object {} - handling op {}", *m, - ox->get_target(), + ox.get_target(), ceph_osd_op_name(osd_op.op.op)); - return ox->execute_op(osd_op); - }).safe_then_interruptible([this, m, ox = ox.get(), &op_info] { + return ox.execute_op(osd_op); + }).safe_then_interruptible([this, m, &ox, &op_info] { logger().debug( - "do_osd_ops: {} - object {} all operations successful", + "do_osd_ops_execute: {} - object {} all operations successful", *m, - ox->get_target()); - return std::move(*ox).flush_changes_n_do_ops_effects( + ox.get_target()); + return std::move(ox).flush_changes_n_do_ops_effects( Ref{this}, [this, m, &op_info] (auto&& txn, auto&& obc, auto&& osd_op_p, - bool user_modify) -> osd_op_ierrorator::future<> { + bool user_modify) { logger().debug( - "do_osd_ops: {} - object {} submitting txn", + "do_osd_ops_execute: {} - object {} submitting txn", *m, obc->get_oid()); fill_op_params_bump_pg_version(osd_op_p, std::move(m), user_modify); @@ -743,41 +736,76 @@ PG::do_osd_ops( std::move(obc), std::move(txn), std::move(osd_op_p)); - }); - }).safe_then_interruptible_tuple([this, m, obc, rvec = op_info.allows_returnvec()]() - -> PG::do_osd_ops_iertr::future> { - // TODO: should stop at the first op which returns a negative retval, - // cmpext uses it for returning the index of first unmatched byte - int result = m->ops.empty() ? 0 : m->ops.back().rval.code; - if (result > 0 && !rvec) { - result = 0; - } - auto reply = make_message(m.get(), - result, - get_osdmap_epoch(), - 0, - false); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - logger().debug( - "do_osd_ops: {} - object {} sending reply", - *m, - obc->obs.oi.soid); - return PG::do_osd_ops_ertr::make_ready_future>( - std::move(reply)); + }); + }).safe_then_interruptible_tuple([success_func=std::move(success_func)] { + return std::move(success_func)(); }, crimson::ct_error::object_corrupted::handle([m, obc, this] { - return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version) - .then_interruptible([] { - return PG::do_osd_ops_ertr::future>( - crimson::ct_error::eagain::make()); + return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version).then_interruptible([] { + return do_osd_ops_iertr::future{crimson::ct_error::eagain::make()}; }); - }), osd_op_errorator::all_same_way([ox = std::move(ox), - m, - obc, - this] (const std::error_code& e) { - return handle_failed_op(e, std::move(obc), *ox, *m); + }), OpsExecuter::osd_op_errorator::all_same_way( + [&ox, m, obc, failure_func=std::move(failure_func), this] (const std::error_code& e) mutable { + const bool need_reload_obc = ox.has_seen_write(); + logger().debug( + "do_osd_ops_execute: {} - object {} got error {}, {}; need_reload_obc {}", + m, + obc->obs.oi.soid, + e.value(), + e.message(), + need_reload_obc); + return ( + need_reload_obc ? reload_obc(*obc) + : interruptor::make_interruptible(load_obc_ertr::now()) + ).safe_then_interruptible([&e, failure_func=std::move(failure_func)] { + return std::move(failure_func)(e); + }, load_obc_ertr::assert_all{ "can't live with object state messed up" }); })); } +PG::do_osd_ops_iertr::future> +PG::do_osd_ops( + Ref m, + ObjectContextRef obc, + const OpInfo &op_info) +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + auto ox = std::make_unique( + obc, op_info, get_pool().info, get_backend(), *m); + return do_osd_ops_execute>( + std::move(*ox), m->ops, m, obc, op_info, + [this, m, obc, rvec = op_info.allows_returnvec()] { + // TODO: should stop at the first op which returns a negative retval, + // cmpext uses it for returning the index of first unmatched byte + int result = m->ops.empty() ? 0 : m->ops.back().rval.code; + if (result > 0 && !rvec) { + result = 0; + } + auto reply = make_message(m.get(), + result, + get_osdmap_epoch(), + 0, + false); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + logger().debug( + "do_osd_ops: {} - object {} sending reply", + *m, + obc->obs.oi.soid); + return do_osd_ops_iertr::make_ready_future>( + std::move(reply)); + }, + [m, this] (const std::error_code& e) { + auto reply = make_message( + m.get(), -e.value(), get_osdmap_epoch(), 0, false); + reply->set_enoent_reply_versions( + peering_state.get_info().last_update, + peering_state.get_info().last_user_version); + return do_osd_ops_iertr::make_ready_future>(std::move(reply)); + } + ).finally([ox_deleter=std::move(ox)] {}); +} + PG::interruptible_future> PG::do_pg_ops(Ref m) { if (__builtin_expect(stopping, false)) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 9de11f7dda9e4..373f92562cc57 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -24,6 +24,7 @@ #include "crimson/os/futurized_collection.h" #include "crimson/osd/backfill_state.h" #include "crimson/osd/pg_interval_interrupt_condition.h" +#include "crimson/osd/ops_executer.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/replicated_request.h" @@ -577,6 +578,15 @@ private: Ref m, ObjectContextRef obc, const OpInfo &op_info); + template + do_osd_ops_iertr::future do_osd_ops_execute( + OpsExecuter&& ox, + std::vector ops, + Ref m, + ObjectContextRef obc, + const OpInfo &op_info, + SuccessFunc&& success_func, + FailureFunc&& failure_func); interruptible_future> do_pg_ops(Ref m); interruptible_future<> submit_transaction(const OpInfo& op_info, ObjectContextRef&& obc,