From 8f3ac965c310d80270e53644c56f3bca30511240 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 26 Sep 2024 22:49:59 +0000 Subject: [PATCH] crimson: remove now unused PG::do_osd_ops* and log_reply Signed-off-by: Samuel Just --- src/crimson/osd/pg.cc | 308 ------------------------------------------ src/crimson/osd/pg.h | 35 +---- 2 files changed, 1 insertion(+), 342 deletions(-) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index bb5c1e9000b..9cdd19d0133 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -978,181 +978,6 @@ ObjectContextRef duplicate_obc(const ObjectContextRef &obc) { return object_context; } -template -PG::do_osd_ops_iertr::future> -PG::do_osd_ops_execute( - seastar::lw_shared_ptr ox, - ObjectContextRef obc, - const OpInfo &op_info, - Ref m, - std::vector& ops, - SuccessFunc&& success_func, - FailureFunc&& failure_func) -{ - assert(ox); - auto rollbacker = ox->create_rollbacker( - [object_context=duplicate_obc(obc)] (auto& obc) mutable { - obc->update_from(*object_context); - }); - auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); - return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) { - logger().debug( - "do_osd_ops_execute: object {} - handling op {}", - ox->get_target(), - ceph_osd_op_name(osd_op.op.op)); - return ox->execute_op(osd_op); - }).safe_then_interruptible([this, ox, &ops] { - /* flush_changes_n_do_ops_effects now returns - * - * interruptible_future< - * tuple, interruptible_future<>>> - * - * Previously, this lambda relied on the second element of that tuple to - * include OpsExecutor::osd_op_errorator in order to propogate the - * following three errors to the next callback. This is actually quite - * awkward as the second future is the completion future, which really - * cannot fail (for it to do so would require an interval change to - * correct). - * - * Rather than reworking this now, I'll leave it as is and refactor it - * later. - */ - using complete_iertr = crimson::interruptible::interruptible_errorator< - ::crimson::osd::IOInterruptCondition, - OpsExecuter::osd_op_errorator>; - using ret_t = std::tuple< - interruptible_future<>, - complete_iertr::future<>>; - - logger().debug( - "do_osd_ops_execute: object {} all operations successful", - ox->get_target()); - // check for full - if ((ox->delta_stats.num_bytes > 0 || - ox->delta_stats.num_objects > 0) && - get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { - const auto& m = ox->get_message(); - if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now - m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { - logger().info(" full, but proceeding due to FULL_FORCE or MDS"); - } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) { - // they tried, they failed. - logger().info(" full, replying to FULL_TRY op"); - if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) - return interruptor::make_ready_future( - interruptor::now(), - complete_iertr::future<>( - crimson::ct_error::edquot::make())); - else - return interruptor::make_ready_future( - interruptor::now(), - complete_iertr::future<>( - crimson::ct_error::enospc::make())); - } else { - // drop request - logger().info(" full, dropping request (bad client)"); - return interruptor::make_ready_future( - interruptor::now(), - complete_iertr::future<>( - crimson::ct_error::eagain::make())); - } - } - return std::move(*ox).flush_changes_n_do_ops_effects( - ops, - snap_mapper, - osdriver, - [this] (auto&& txn, - auto&& obc, - auto&& osd_op_p, - auto&& log_entries) { - logger().debug( - "do_osd_ops_execute: object {} submitting txn", - obc->get_oid()); - mutate_object(obc, txn, osd_op_p); - return submit_transaction( - std::move(obc), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - }).then_interruptible([](auto &&futs) { - auto &&[submitted, completed] = std::move(futs); - return interruptor::make_ready_future( - std::move(submitted), - std::move(completed)); - }); - }).safe_then_unpack_interruptible( - [success_func=std::move(success_func), rollbacker, this, failure_func_ptr, obc] - (auto submitted_fut, auto _all_completed_fut) mutable { - - auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple( - std::move(success_func), - crimson::ct_error::object_corrupted::handle( - [rollbacker, this, obc] (const std::error_code& e) mutable { - // this is a path for EIO. it's special because we want to fix the obejct - // and try again. that is, the layer above `PG::do_osd_ops` is supposed to - // restart the execution. - rollbacker.rollback_obc_if_modified(e); - return repair_object(obc->obs.oi.soid, - obc->obs.oi.version - ).then_interruptible([] { - return do_osd_ops_iertr::future{crimson::ct_error::eagain::make()}; - }); - }), OpsExecuter::osd_op_errorator::all_same_way( - [rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - // handle non-fatal errors only - ceph_assert(e.value() == EDQUOT || - e.value() == ENOSPC || - e.value() == EAGAIN); - rollbacker.rollback_obc_if_modified(e); - return (*failure_func_ptr)(e); - })); - - return PG::do_osd_ops_iertr::make_ready_future>( - std::move(submitted_fut), - std::move(all_completed_fut) - ); - }, OpsExecuter::osd_op_errorator::all_same_way( - [this, op_info, m, obc, - rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - ceph_tid_t rep_tid = shard_services.get_tid(); - rollbacker.rollback_obc_if_modified(e); - // record error log - auto maybe_submit_error_log = - interruptor::make_ready_future>(std::nullopt); - // call submit_error_log only for non-internal clients - if constexpr (!std::is_same_v) { - if(op_info.may_write()) { - maybe_submit_error_log = - submit_error_log( - m, op_info, obc, e, rep_tid - ).then_interruptible([](auto &&e) { - return std::make_optional(std::move(e)); - }); - } - } - return maybe_submit_error_log.then_interruptible( - [this, failure_func_ptr, e, rep_tid] (auto version) { - auto all_completed = - [this, failure_func_ptr, e, rep_tid, version] { - if (version.has_value()) { - return complete_error_log(rep_tid, version.value() - ).then_interruptible([failure_func_ptr, e] { - return (*failure_func_ptr)(e); - }); - } else { - return (*failure_func_ptr)(e); - } - }; - return PG::do_osd_ops_iertr::make_ready_future>( - std::move(seastar::now()), - std::move(all_completed()) - ); - }); - })); -} - PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version) { @@ -1329,139 +1154,6 @@ PG::submit_executer_fut PG::submit_executer( }); } - -PG::do_osd_ops_iertr::future>> -PG::do_osd_ops( - Ref m, - crimson::net::ConnectionXcoreRef conn, - ObjectContextRef obc, - const OpInfo &op_info, - const SnapContext& snapc) -{ - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - return do_osd_ops_execute>( - seastar::make_lw_shared( - Ref{this}, obc, op_info, *m, conn, snapc), - obc, - op_info, - m, - m->ops, - // success_func - [this, m, obc, may_write = op_info.may_write(), - may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] { - // TODO: should stop at the first op which returns a negative retval, - // cmpext uses it for returning the index of first unmatched byte - int result = m->ops.empty() ? 0 : m->ops.back().rval.code; - if (may_read && result >= 0) { - for (auto &osdop : m->ops) { - if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = osdop.rval.code; - break; - } - } - } else if (result > 0 && may_write && !rvec) { - result = 0; - } else if (result < 0 && (m->ops.empty() ? - 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = 0; - } - auto reply = crimson::make_message(m.get(), - result, - get_osdmap_epoch(), - 0, - false); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - logger().debug( - "do_osd_ops: {} - object {} sending reply", - *m, - m->get_hobj()); - if (obc->obs.exists) { - reply->set_reply_versions(peering_state.get_info().last_update, - obc->obs.oi.user_version); - } else { - reply->set_reply_versions(peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - } - return do_osd_ops_iertr::make_ready_future>( - std::move(reply)); - }, - // failure_func - [m, this] - (const std::error_code& e) { - logger().error("do_osd_ops_execute::failure_func {} got error: {}", - *m, e); - return log_reply(m, e); - }); -} - -PG::do_osd_ops_iertr::future> -PG::log_reply( - Ref m, - const std::error_code& e) -{ - auto reply = crimson::make_message( - m.get(), -e.value(), get_osdmap_epoch(), 0, false); - if (m->ops.empty() ? 0 : - m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { - reply->set_result(0); - } - // For all ops except for CMPEXT, the correct error value is encoded - // in e.value(). For CMPEXT, osdop.rval has the actual error value. - if (e.value() == ct_error::cmp_fail_error_value) { - assert(!m->ops.empty()); - for (auto &osdop : m->ops) { - if (osdop.rval < 0) { - reply->set_result(osdop.rval); - break; - } - } - } - reply->set_enoent_reply_versions( - peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - return do_osd_ops_iertr::make_ready_future>( - std::move(reply)); -} - -PG::do_osd_ops_iertr::future> -PG::do_osd_ops( - ObjectContextRef obc, - std::vector& ops, - const OpInfo &op_info, - const do_osd_ops_params_t &&msg_params) -{ - // This overload is generally used for internal client requests, - // use an empty SnapContext. - return seastar::do_with( - std::move(msg_params), - [=, this, &ops, &op_info](auto &msg_params) { - return do_osd_ops_execute( - seastar::make_lw_shared( - Ref{this}, - obc, - op_info, - msg_params, - msg_params.get_connection(), - SnapContext{} - ), - obc, - op_info, - Ref(), - ops, - // success_func - [] { - return do_osd_ops_iertr::now(); - }, - // failure_func - [] (const std::error_code& e) { - return do_osd_ops_iertr::now(); - }); - }); -} - PG::interruptible_future> PG::do_pg_ops(Ref m) { if (__builtin_expect(stopping, false)) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index c91f93171db..3a8ddad922a 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -672,41 +672,8 @@ private: seastar::lw_shared_ptr ox, const std::vector& ops); - using do_osd_ops_ertr = crimson::errorator< - crimson::ct_error::eagain>; - using do_osd_ops_iertr = - ::crimson::interruptible::interruptible_errorator< - ::crimson::osd::IOInterruptCondition, - ::crimson::errorator>; - template - using pg_rep_op_fut_t = - std::tuple, - do_osd_ops_iertr::future>; - do_osd_ops_iertr::future>> do_osd_ops( - Ref m, - crimson::net::ConnectionXcoreRef conn, - ObjectContextRef obc, - const OpInfo &op_info, - const SnapContext& snapc); - struct do_osd_ops_params_t; - do_osd_ops_iertr::future> log_reply( - Ref m, - const std::error_code& e); - do_osd_ops_iertr::future> do_osd_ops( - ObjectContextRef obc, - std::vector& ops, - const OpInfo &op_info, - const do_osd_ops_params_t &¶ms); - template - do_osd_ops_iertr::future> do_osd_ops_execute( - seastar::lw_shared_ptr ox, - ObjectContextRef obc, - const OpInfo &op_info, - Ref m, - std::vector& ops, - SuccessFunc&& success_func, - FailureFunc&& failure_func); + interruptible_future> do_pg_ops(Ref m); interruptible_future< std::tuple, interruptible_future<>>> -- 2.39.5