PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
PG::do_osd_ops_execute(
seastar::lw_shared_ptr<OpsExecuter> ox,
+ ObjectContextRef obc,
+ const OpInfo &op_info,
+ Ref<MOSDOp> m,
std::vector<OSDOp>& ops,
SuccessFunc&& success_func,
FailureFunc&& failure_func)
return obc_loader.reload_obc(obc).handle_error_interruptible(
load_obc_ertr::assert_all{"can't live with object state messed up"});
});
+ auto maybe_submit_error_log = [&, op_info, m, obc]
+ (const std::error_code& e, const ceph_tid_t& rep_tid) {
+ // call submit_error_log only for non-internal clients
+ if constexpr (!std::is_same_v<Ret, void>) {
+ if(op_info.may_write()) {
+ return submit_error_log(m, op_info, obc, e, rep_tid);
+ }
+ }
+ return seastar::now();
+ };
+ auto error_func_ptr = seastar::make_lw_shared(std::move(maybe_submit_error_log));
auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
logger().debug(
std::move(log_entries));
});
}).safe_then_unpack_interruptible(
- [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
+ [success_func=std::move(success_func), error_func_ptr, rollbacker, this, failure_func_ptr]
(auto submitted_fut, auto _all_completed_fut) mutable {
auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple(
std::move(all_completed_fut)
);
}, OpsExecuter::osd_op_errorator::all_same_way(
- [this, rollbacker, failure_func_ptr]
+ [this, error_func_ptr, rollbacker, failure_func_ptr]
(const std::error_code& e) mutable {
- auto submitted_fut = seastar::now();
+ PG::interruptible_future<> maybe_rollback_fut = seastar::now();
ceph_tid_t rep_tid = shard_services.get_tid();
- auto all_completed_fut = e.value() == ENOENT ?
- (*failure_func_ptr)(e, rep_tid, true) :
- rollbacker.rollback_obc_if_modified(e).then_interruptible(
- [e, failure_func_ptr, rep_tid] {
- return (*failure_func_ptr)(e, rep_tid, true);
- });
+ if (e.value() == ENOENT) {
+ maybe_rollback_fut = rollbacker.rollback_obc_if_modified(e);
+ }
- return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
- std::move(submitted_fut),
- std::move(all_completed_fut)
- );
+ return maybe_rollback_fut.then_interruptible(
+ [error_func_ptr, e, rep_tid, failure_func_ptr] {
+ // record error log
+ return (*error_func_ptr)(e, rep_tid).then(
+ [failure_func_ptr, e, rep_tid] {
+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+ std::move(seastar::now()),
+ std::move((*failure_func_ptr)(e, rep_tid, true))
+ );
+ });
+ });
}));
}
return do_osd_ops_execute<MURef<MOSDOpReply>>(
seastar::make_lw_shared<OpsExecuter>(
Ref<PG>{this}, obc, op_info, *m, conn, snapc),
+ obc,
+ op_info,
+ m,
m->ops,
// success_func
[this, m, obc, may_write = op_info.may_write(),
(const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) {
logger().error("do_osd_ops_execute::failure_func {} got error: {} record_error: {}",
*m, e, record_error);
- auto error_log_fut = seastar::now();
epoch_t epoch = get_osdmap_epoch();
auto last_complete = peering_state.get_info().last_complete;
- if (op_info.may_write()) {
- error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid);
- }
- return error_log_fut.then([m, e, epoch, &op_info, rep_tid, last_complete, this] {
- auto fut = seastar::now();
- if (record_error && !peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
- logger().debug("do_osd_ops_execute::failure_func finding rep_tid {}",
- rep_tid);
- ceph_assert(log_entry_version.contains(rep_tid));
- auto it = log_entry_update_waiting_on.find(rep_tid);
- ceph_assert(it != log_entry_update_waiting_on.end());
- auto it2 = it->second.waiting_on.find(pg_whoami);
- ceph_assert(it2 != it->second.waiting_on.end());
- it->second.waiting_on.erase(it2);
- if (it->second.waiting_on.empty()) {
- log_entry_update_waiting_on.erase(it);
+ auto fut = seastar::now();
+ if (record_error && !peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+ logger().debug("do_osd_ops_execute::failure_func finding rep_tid {}",
+ rep_tid);
+ ceph_assert(log_entry_version.contains(rep_tid));
+ auto it = log_entry_update_waiting_on.find(rep_tid);
+ ceph_assert(it != log_entry_update_waiting_on.end());
+ auto it2 = it->second.waiting_on.find(pg_whoami);
+ ceph_assert(it2 != it->second.waiting_on.end());
+ it->second.waiting_on.erase(it2);
+ if (it->second.waiting_on.empty()) {
+ log_entry_update_waiting_on.erase(it);
+ peering_state.complete_write(log_entry_version[rep_tid], last_complete);
+ log_entry_version.erase(rep_tid);
+ logger().debug("do_osd_ops_execute::failure_func write complete,"
+ " erasing rep_tid {}", rep_tid);
+
+ } else {
+ fut = it->second.all_committed.get_shared_future().then(
+ [this, last_complete, rep_tid] {
+ logger().debug("do_osd_ops_execute::failure_func awaited {}", rep_tid);
peering_state.complete_write(log_entry_version[rep_tid], last_complete);
- log_entry_version.erase(rep_tid);
- logger().debug("do_osd_ops_execute::failure_func write complete,"
- " erasing rep_tid {}", rep_tid);
-
- } else {
- fut = it->second.all_committed.get_shared_future().then(
- [this, last_complete, rep_tid] {
- logger().debug("do_osd_ops_execute::failure_func awaited {}", rep_tid);
- peering_state.complete_write(log_entry_version[rep_tid], last_complete);
- ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
- return seastar::now();
- });
- }
+ ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
+ return seastar::now();
+ });
}
- return fut.then([this, m, e] {
- return log_reply(m, e);
- });
+ }
+ return fut.then([this, m, e] {
+ return log_reply(m, e);
});
});
}
return do_osd_ops_execute<void>(
seastar::make_lw_shared<OpsExecuter>(
Ref<PG>{this},
- std::move(obc),
+ obc,
op_info,
msg_params,
msg_params.get_connection(),
SnapContext{}
),
+ obc,
+ op_info,
+ Ref<MOSDOp>(),
ops,
// success_func
[] {