pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
- osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(),
+ osd_op_p.req_id, osd_op_p.mtime,
op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
// TODO: refactor the submit_transaction
if (op_info.allows_returnvec()) {
void PG::fill_op_params_bump_pg_version(
osd_op_params_t& osd_op_p,
- Ref<MOSDOp> m,
const bool user_modify)
{
- osd_op_p.req = std::move(m);
osd_op_p.at_version = next_version();
osd_op_p.pg_trim_to = get_pg_trim_to();
osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
OpsExecuter&& ox,
std::vector<OSDOp> ops,
- Ref<MOSDOp> m,
const OpInfo &op_info,
SuccessFunc&& success_func,
FailureFunc&& failure_func)
return reload_obc(obc).handle_error_interruptible(
load_obc_ertr::assert_all{"can't live with object state messed up"});
});
- return interruptor::do_for_each(ops, [m, &ox](OSDOp& osd_op) {
+ return interruptor::do_for_each(ops, [&ox](OSDOp& osd_op) {
logger().debug(
- "do_osd_ops_execute: {} - object {} - handling op {}",
- *m,
+ "do_osd_ops_execute: object {} - handling op {}",
ox.get_target(),
ceph_osd_op_name(osd_op.op.op));
return ox.execute_op(osd_op);
- }).safe_then_interruptible([this, m, &ox, &op_info] {
+ }).safe_then_interruptible([this, &ox, &op_info, &ops] {
logger().debug(
- "do_osd_ops_execute: {} - object {} all operations successful",
- *m,
+ "do_osd_ops_execute: object {} all operations successful",
ox.get_target());
return std::move(ox).flush_changes_n_do_ops_effects(
Ref<PG>{this},
- [this, m, &op_info, &ops] (auto&& txn,
- auto&& obc,
- auto&& osd_op_p,
- bool user_modify) {
+ [this, &op_info, &ops] (auto&& txn,
+ auto&& obc,
+ auto&& osd_op_p,
+ bool user_modify) {
logger().debug(
- "do_osd_ops_execute: {} - object {} submitting txn",
- *m,
+ "do_osd_ops_execute: object {} submitting txn",
obc->get_oid());
- fill_op_params_bump_pg_version(osd_op_p, std::move(m), user_modify);
+ fill_op_params_bump_pg_version(osd_op_p, user_modify);
return submit_transaction(
op_info,
ops,
});
});
}), OpsExecuter::osd_op_errorator::all_same_way(
- [rollbacker, failure_func=std::move(failure_func), m]
+ [rollbacker, failure_func=std::move(failure_func)]
(const std::error_code& e) mutable {
return rollbacker.rollback_obc_if_modified(e).then_interruptible(
[&e, failure_func=std::move(failure_func)] {
auto ox = std::make_unique<OpsExecuter>(
std::move(obc), op_info, get_pool().info, get_backend(), *m);
return do_osd_ops_execute<Ref<MOSDOpReply>>(
- std::move(*ox), m->ops, m, op_info,
+ std::move(*ox), m->ops, op_info,
[this, m, rvec = op_info.allows_returnvec()] {
// TODO: should stop at the first op which returns a negative retval,
// cmpext uses it for returning the index of first unmatched byte
}
const ceph_tid_t tid = next_txn_id++;
- auto req_id = osd_op_p.req->get_reqid();
auto pending_txn =
pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first;
bufferlist encoded_txn;
if (pg_shard == whoami) {
return shard_services.get_store().do_transaction(coll,std::move(txn));
} else {
- auto m = crimson::net::make_message<MOSDRepOp>(req_id, whoami,
- spg_t{pgid, pg_shard.shard}, hoid,
- CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
- map_epoch, min_epoch,
- tid, osd_op_p.at_version);
+ auto m = crimson::net::make_message<MOSDRepOp>(
+ osd_op_p.req_id,
+ whoami,
+ spg_t{pgid, pg_shard.shard},
+ hoid,
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+ map_epoch,
+ min_epoch,
+ tid,
+ osd_op_p.at_version);
m->set_data(encoded_txn);
pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
encode(log_entries, m->logbl);