#include <boost/range/numeric.hpp>
#include <fmt/format.h>
#include <fmt/ostream.h>
+
+#include <seastar/util/defer.hh>
+
#include "include/utime_fmt.h"
#include "common/hobject.h"
});
}
+PG::run_executer_fut PG::run_executer(
+ seastar::lw_shared_ptr<OpsExecuter> ox,
+ ObjectContextRef obc,
+ const OpInfo &op_info,
+ std::vector<OSDOp>& ops)
+{
+ LOG_PREFIX(PG::run_executer);
+ auto rollbacker = ox->create_rollbacker(
+ [stored_obc=duplicate_obc(obc)](auto &obc) mutable {
+ obc->update_from(*stored_obc);
+ });
+ auto rollback_on_error = seastar::defer([&rollbacker] {
+ rollbacker.rollback_obc_if_modified();
+ });
+
+ for (auto &op: ops) {
+ DEBUGDPP("object {} handle op {}", *this, ox->get_target(), op);
+ co_await ox->execute_op(op);
+ }
+ DEBUGDPP("object {} all operations successful", *this, ox->get_target());
+
+ // check for full
+ if ((ox->delta_stats.num_bytes > 0 ||
+ ox->delta_stats.num_objects > 0) &&
+ get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
+ const auto& m = ox->get_message();
+ if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now
+ m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
+ INFODPP("full, but proceeding due to FULL_FORCE, or MDS", *this);
+ } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
+ // they tried, they failed.
+ INFODPP("full, replying to FULL_TRY op", *this);
+ if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) {
+ co_await run_executer_fut(
+ crimson::ct_error::edquot::make());
+ } else {
+ co_await run_executer_fut(
+ crimson::ct_error::enospc::make());
+ }
+ } else {
+ // drop request
+ INFODPP("full, dropping request (bad client)", *this);
+ co_await run_executer_fut(
+ crimson::ct_error::eagain::make());
+ }
+ }
+ rollback_on_error.cancel();
+}
+
+PG::submit_executer_fut PG::submit_executer(
+ seastar::lw_shared_ptr<OpsExecuter> ox,
+ const std::vector<OSDOp>& ops)
+{
+ LOG_PREFIX(PG::submit_executer);
+ // transaction must commit at this point
+ return std::move(
+ *ox
+ ).flush_changes_n_do_ops_effects(
+ ops,
+ snap_mapper,
+ osdriver,
+ [FNAME, this](auto&& txn,
+ auto&& obc,
+ auto&& osd_op_p,
+ auto&& log_entries) {
+ DEBUGDPP("object {} submitting txn", *this, obc->get_oid());
+ mutate_object(obc, txn, osd_op_p);
+ return submit_transaction(
+ std::move(obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ std::move(log_entries));
+ });
+}
+
+
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
Ref<MOSDOp> m,
}
} background_process_lock;
+ using run_executer_ertr = crimson::compound_errorator_t<
+ OpsExecuter::osd_op_errorator,
+ crimson::errorator<
+ crimson::ct_error::edquot,
+ crimson::ct_error::eagain,
+ crimson::ct_error::enospc
+ >
+ >;
+ using run_executer_iertr = crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ run_executer_ertr>;
+ using run_executer_fut = run_executer_iertr::future<>;
+ run_executer_fut run_executer(
+ seastar::lw_shared_ptr<OpsExecuter> ox,
+ ObjectContextRef obc,
+ const OpInfo &op_info,
+ std::vector<OSDOp>& ops);
+
+ using submit_executer_ret = std::tuple<
+ interruptible_future<>,
+ interruptible_future<>>;
+ using submit_executer_fut = interruptible_future<
+ submit_executer_ret>;
+ submit_executer_fut submit_executer(
+ seastar::lw_shared_ptr<OpsExecuter> ox,
+ const std::vector<OSDOp>& ops);
+
using do_osd_ops_ertr = crimson::errorator<
crimson::ct_error::eagain>;
using do_osd_ops_iertr =