}
}
+void OpsExecuter::fill_op_params_bump_pg_version()
+{
+ osd_op_params->req_id = msg->get_reqid();
+ osd_op_params->mtime = msg->get_mtime();
+ osd_op_params->at_version = pg->next_version();
+ osd_op_params->pg_trim_to = pg->get_pg_trim_to();
+ osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
+ osd_op_params->last_complete = pg->get_info().last_complete;
+ if (user_modify) {
+ osd_op_params->user_at_version = osd_op_params->at_version.version;
+ }
+}
+
+std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
+ const std::vector<OSDOp>& ops)
+{
+ std::vector<pg_log_entry_t> log_entries;
+ log_entries.emplace_back(obc->obs.exists ?
+ pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+ obc->obs.oi.soid, osd_op_params->at_version, obc->obs.oi.version,
+ osd_op_params->user_modify ? osd_op_params->at_version.version : 0,
+ osd_op_params->req_id, osd_op_params->mtime,
+ op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
+ if (op_info.allows_returnvec()) {
+ // also the per-op values are recorded in the pg log
+ log_entries.back().set_op_returns(ops);
+ logger().debug("{} op_returns: {}",
+ __func__, log_entries.back().op_returns);
+ }
+ log_entries.back().clean_regions = std::move(osd_op_params->clean_regions);
+ return log_entries;
+}
+
// Defined here because there is a circular dependency between OpsExecuter and PG
uint32_t OpsExecuter::get_pool_stripe_width() const {
return pg->get_pool().info.get_stripe_width();
using rep_op_fut_t =
interruptible_future<rep_op_fut_tuple>;
template <typename MutFunc>
- rep_op_fut_t flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&;
+ rep_op_fut_t flush_changes_n_do_ops_effects(const std::vector<OSDOp>& ops,
+ MutFunc&& mut_func) &&;
+ std::vector<pg_log_entry_t> prepare_transaction(
+ const std::vector<OSDOp>& ops);
+ void fill_op_params_bump_pg_version();
const hobject_t &get_target() const {
return obc->obs.oi.soid;
template <typename MutFunc>
OpsExecuter::rep_op_fut_t
-OpsExecuter::flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&
+OpsExecuter::flush_changes_n_do_ops_effects(
+ const std::vector<OSDOp>& ops,
+ MutFunc&& mut_func) &&
{
const bool want_mutate = !txn.empty();
// osd_op_params are instantiated by every wr-like operation.
seastar::now(),
interruptor::make_interruptible(osd_op_errorator::now()));
if (want_mutate) {
- osd_op_params->req_id = msg->get_reqid();
- osd_op_params->mtime = msg->get_mtime();
+ fill_op_params_bump_pg_version();
+ auto log_entries = prepare_transaction(ops);
auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
std::move(obc),
std::move(*osd_op_params),
- user_modify);
+ std::move(log_entries));
maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
std::move(submitted),
osd_op_ierrorator::future<>(std::move(all_completed)));
std::tuple<PG::interruptible_future<>,
PG::interruptible_future<>>
PG::submit_transaction(
- const OpInfo& op_info,
- const std::vector<OSDOp>& ops,
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
- osd_op_params_t&& osd_op_p)
+ osd_op_params_t&& osd_op_p,
+ std::vector<pg_log_entry_t>&& log_entries)
{
if (__builtin_expect(stopping, false)) {
return {seastar::make_exception_future<>(
throw crimson::common::actingset_changed(is_primary());
}
- std::vector<pg_log_entry_t> log_entries;
- log_entries.emplace_back(obc->obs.exists ?
- pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
- obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
- osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
- osd_op_p.req_id, osd_op_p.mtime,
- op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
- // TODO: refactor the submit_transaction
- if (op_info.allows_returnvec()) {
- // also the per-op values are recorded in the pg log
- log_entries.back().set_op_returns(ops);
- logger().debug("{} op_returns: {}",
- __func__, log_entries.back().op_returns);
- }
- log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
txn, true, false);
}));
}
-void PG::fill_op_params_bump_pg_version(
- osd_op_params_t& osd_op_p,
- const bool user_modify)
-{
- osd_op_p.at_version = next_version();
- osd_op_p.pg_trim_to = get_pg_trim_to();
- osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
- osd_op_p.last_complete = get_info().last_complete;
- if (user_modify) {
- osd_op_p.user_at_version = osd_op_p.at_version.version;
- }
-}
-
PG::interruptible_future<> PG::repair_object(
const hobject_t& oid,
eversion_t& v)
PG::do_osd_ops_execute(
seastar::lw_shared_ptr<OpsExecuter> ox,
std::vector<OSDOp>& ops,
- const OpInfo &op_info,
SuccessFunc&& success_func,
FailureFunc&& failure_func)
{
ox->get_target(),
ceph_osd_op_name(osd_op.op.op));
return ox->execute_op(osd_op);
- }).safe_then_interruptible([this, ox, &op_info, &ops] {
+ }).safe_then_interruptible([this, ox, &ops] {
logger().debug(
"do_osd_ops_execute: object {} all operations successful",
ox->get_target());
peering_state.apply_op_stats(ox->get_target(), ox->get_stats());
- return std::move(*ox).flush_changes_n_do_ops_effects(
- [this, &op_info, &ops] (auto&& txn,
- auto&& obc,
- auto&& osd_op_p,
- bool user_modify) {
+ return std::move(*ox).flush_changes_n_do_ops_effects(ops,
+ [this] (auto&& txn,
+ auto&& obc,
+ auto&& osd_op_p,
+ auto&& log_entries) {
logger().debug(
"do_osd_ops_execute: object {} submitting txn",
obc->get_oid());
- fill_op_params_bump_pg_version(osd_op_p, user_modify);
return submit_transaction(
- op_info,
- ops,
std::move(obc),
std::move(txn),
- std::move(osd_op_p));
+ std::move(osd_op_p),
+ std::move(log_entries));
});
}).safe_then_unpack_interruptible(
[success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
seastar::make_lw_shared<OpsExecuter>(
Ref<PG>{this}, obc, op_info, *m),
m->ops,
- op_info,
[this, m, obc, may_write = op_info.may_write(),
may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
// TODO: should stop at the first op which returns a negative retval,
seastar::make_lw_shared<OpsExecuter>(
Ref<PG>{this}, std::move(obc), op_info, msg_params),
ops,
- std::as_const(op_info),
std::move(success_func),
std::move(failure_func));
});