return object_context;
}
-template <class Ret, class SuccessFunc, class FailureFunc>
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
-PG::do_osd_ops_execute(
- seastar::lw_shared_ptr<OpsExecuter> ox,
- ObjectContextRef obc,
- const OpInfo &op_info,
- Ref<MOSDOp> m,
- std::vector<OSDOp>& ops,
- SuccessFunc&& success_func,
- FailureFunc&& failure_func)
-{
- assert(ox);
- auto rollbacker = ox->create_rollbacker(
- [object_context=duplicate_obc(obc)] (auto& obc) mutable {
- obc->update_from(*object_context);
- });
- auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
- return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
- logger().debug(
- "do_osd_ops_execute: object {} - handling op {}",
- ox->get_target(),
- ceph_osd_op_name(osd_op.op.op));
- return ox->execute_op(osd_op);
- }).safe_then_interruptible([this, ox, &ops] {
- /* flush_changes_n_do_ops_effects now returns
- *
- * interruptible_future<
- * tuple<interruptible_future<>, interruptible_future<>>>
- *
- * Previously, this lambda relied on the second element of that tuple to
- * include OpsExecutor::osd_op_errorator in order to propogate the
- * following three errors to the next callback. This is actually quite
- * awkward as the second future is the completion future, which really
- * cannot fail (for it to do so would require an interval change to
- * correct).
- *
- * Rather than reworking this now, I'll leave it as is and refactor it
- * later.
- */
- using complete_iertr = crimson::interruptible::interruptible_errorator<
- ::crimson::osd::IOInterruptCondition,
- OpsExecuter::osd_op_errorator>;
- using ret_t = std::tuple<
- interruptible_future<>,
- complete_iertr::future<>>;
-
- logger().debug(
- "do_osd_ops_execute: object {} all operations successful",
- ox->get_target());
- // check for full
- if ((ox->delta_stats.num_bytes > 0 ||
- ox->delta_stats.num_objects > 0) &&
- get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
- const auto& m = ox->get_message();
- if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now
- m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
- logger().info(" full, but proceeding due to FULL_FORCE or MDS");
- } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
- // they tried, they failed.
- logger().info(" full, replying to FULL_TRY op");
- if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA))
- return interruptor::make_ready_future<ret_t>(
- interruptor::now(),
- complete_iertr::future<>(
- crimson::ct_error::edquot::make()));
- else
- return interruptor::make_ready_future<ret_t>(
- interruptor::now(),
- complete_iertr::future<>(
- crimson::ct_error::enospc::make()));
- } else {
- // drop request
- logger().info(" full, dropping request (bad client)");
- return interruptor::make_ready_future<ret_t>(
- interruptor::now(),
- complete_iertr::future<>(
- crimson::ct_error::eagain::make()));
- }
- }
- return std::move(*ox).flush_changes_n_do_ops_effects(
- ops,
- snap_mapper,
- osdriver,
- [this] (auto&& txn,
- auto&& obc,
- auto&& osd_op_p,
- auto&& log_entries) {
- logger().debug(
- "do_osd_ops_execute: object {} submitting txn",
- obc->get_oid());
- mutate_object(obc, txn, osd_op_p);
- return submit_transaction(
- std::move(obc),
- std::move(txn),
- std::move(osd_op_p),
- std::move(log_entries));
- }).then_interruptible([](auto &&futs) {
- auto &&[submitted, completed] = std::move(futs);
- return interruptor::make_ready_future<ret_t>(
- std::move(submitted),
- std::move(completed));
- });
- }).safe_then_unpack_interruptible(
- [success_func=std::move(success_func), rollbacker, this, failure_func_ptr, obc]
- (auto submitted_fut, auto _all_completed_fut) mutable {
-
- auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple(
- std::move(success_func),
- crimson::ct_error::object_corrupted::handle(
- [rollbacker, this, obc] (const std::error_code& e) mutable {
- // this is a path for EIO. it's special because we want to fix the obejct
- // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
- // restart the execution.
- rollbacker.rollback_obc_if_modified(e);
- return repair_object(obc->obs.oi.soid,
- obc->obs.oi.version
- ).then_interruptible([] {
- return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
- });
- }), OpsExecuter::osd_op_errorator::all_same_way(
- [rollbacker, failure_func_ptr]
- (const std::error_code& e) mutable {
- // handle non-fatal errors only
- ceph_assert(e.value() == EDQUOT ||
- e.value() == ENOSPC ||
- e.value() == EAGAIN);
- rollbacker.rollback_obc_if_modified(e);
- return (*failure_func_ptr)(e);
- }));
-
- return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
- std::move(submitted_fut),
- std::move(all_completed_fut)
- );
- }, OpsExecuter::osd_op_errorator::all_same_way(
- [this, op_info, m, obc,
- rollbacker, failure_func_ptr]
- (const std::error_code& e) mutable {
- ceph_tid_t rep_tid = shard_services.get_tid();
- rollbacker.rollback_obc_if_modified(e);
- // record error log
- auto maybe_submit_error_log =
- interruptor::make_ready_future<std::optional<eversion_t>>(std::nullopt);
- // call submit_error_log only for non-internal clients
- if constexpr (!std::is_same_v<Ret, void>) {
- if(op_info.may_write()) {
- maybe_submit_error_log =
- submit_error_log(
- m, op_info, obc, e, rep_tid
- ).then_interruptible([](auto &&e) {
- return std::make_optional<eversion_t>(std::move(e));
- });
- }
- }
- return maybe_submit_error_log.then_interruptible(
- [this, failure_func_ptr, e, rep_tid] (auto version) {
- auto all_completed =
- [this, failure_func_ptr, e, rep_tid, version] {
- if (version.has_value()) {
- return complete_error_log(rep_tid, version.value()
- ).then_interruptible([failure_func_ptr, e] {
- return (*failure_func_ptr)(e);
- });
- } else {
- return (*failure_func_ptr)(e);
- }
- };
- return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
- std::move(seastar::now()),
- std::move(all_completed())
- );
- });
- }));
-}
-
PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
const eversion_t& version)
{
});
}
-
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
-PG::do_osd_ops(
- Ref<MOSDOp> m,
- crimson::net::ConnectionXcoreRef conn,
- ObjectContextRef obc,
- const OpInfo &op_info,
- const SnapContext& snapc)
-{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
- return do_osd_ops_execute<MURef<MOSDOpReply>>(
- seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, obc, op_info, *m, conn, snapc),
- obc,
- op_info,
- m,
- m->ops,
- // success_func
- [this, m, obc, may_write = op_info.may_write(),
- may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
- // TODO: should stop at the first op which returns a negative retval,
- // cmpext uses it for returning the index of first unmatched byte
- int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
- if (may_read && result >= 0) {
- for (auto &osdop : m->ops) {
- if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
- result = osdop.rval.code;
- break;
- }
- }
- } else if (result > 0 && may_write && !rvec) {
- result = 0;
- } else if (result < 0 && (m->ops.empty() ?
- 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
- result = 0;
- }
- auto reply = crimson::make_message<MOSDOpReply>(m.get(),
- result,
- get_osdmap_epoch(),
- 0,
- false);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- logger().debug(
- "do_osd_ops: {} - object {} sending reply",
- *m,
- m->get_hobj());
- if (obc->obs.exists) {
- reply->set_reply_versions(peering_state.get_info().last_update,
- obc->obs.oi.user_version);
- } else {
- reply->set_reply_versions(peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- }
- return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
- std::move(reply));
- },
- // failure_func
- [m, this]
- (const std::error_code& e) {
- logger().error("do_osd_ops_execute::failure_func {} got error: {}",
- *m, e);
- return log_reply(m, e);
- });
-}
-
-PG::do_osd_ops_iertr::future<MURef<MOSDOpReply>>
-PG::log_reply(
- Ref<MOSDOp> m,
- const std::error_code& e)
-{
- auto reply = crimson::make_message<MOSDOpReply>(
- m.get(), -e.value(), get_osdmap_epoch(), 0, false);
- if (m->ops.empty() ? 0 :
- m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
- reply->set_result(0);
- }
- // For all ops except for CMPEXT, the correct error value is encoded
- // in e.value(). For CMPEXT, osdop.rval has the actual error value.
- if (e.value() == ct_error::cmp_fail_error_value) {
- assert(!m->ops.empty());
- for (auto &osdop : m->ops) {
- if (osdop.rval < 0) {
- reply->set_result(osdop.rval);
- break;
- }
- }
- }
- reply->set_enoent_reply_versions(
- peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
- std::move(reply));
-}
-
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
-PG::do_osd_ops(
- ObjectContextRef obc,
- std::vector<OSDOp>& ops,
- const OpInfo &op_info,
- const do_osd_ops_params_t &&msg_params)
-{
- // This overload is generally used for internal client requests,
- // use an empty SnapContext.
- return seastar::do_with(
- std::move(msg_params),
- [=, this, &ops, &op_info](auto &msg_params) {
- return do_osd_ops_execute<void>(
- seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this},
- obc,
- op_info,
- msg_params,
- msg_params.get_connection(),
- SnapContext{}
- ),
- obc,
- op_info,
- Ref<MOSDOp>(),
- ops,
- // success_func
- [] {
- return do_osd_ops_iertr::now();
- },
- // failure_func
- [] (const std::error_code& e) {
- return do_osd_ops_iertr::now();
- });
- });
-}
-
PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
{
if (__builtin_expect(stopping, false)) {