rollbacker.rollback_obc_if_modified(e);
// record error log
auto maybe_submit_error_log =
- seastar::make_ready_future<std::optional<eversion_t>>(std::nullopt);
+ interruptor::make_ready_future<std::optional<eversion_t>>(std::nullopt);
// call submit_error_log only for non-internal clients
if constexpr (!std::is_same_v<Ret, void>) {
if(op_info.may_write()) {
submit_error_log(m, op_info, obc, e, rep_tid);
}
}
- return maybe_submit_error_log.then(
+ return maybe_submit_error_log.then_interruptible(
[this, failure_func_ptr, e, rep_tid] (auto version) {
auto all_completed =
[this, failure_func_ptr, e, rep_tid, version] {
if (version.has_value()) {
- return complete_error_log(rep_tid, version.value()).then(
- [failure_func_ptr, e] {
+ return complete_error_log(rep_tid, version.value()
+ ).then_interruptible([failure_func_ptr, e] {
return (*failure_func_ptr)(e);
});
} else {
}));
}
-seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
+PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
const eversion_t& version)
{
- auto result = seastar::now();
+ auto result = interruptor::now();
auto last_complete = peering_state.get_info().last_complete;
ceph_assert(log_entry_update_waiting_on.contains(rep_tid));
auto& log_update = log_entry_update_waiting_on[rep_tid];
} else {
logger().debug("complete_error_log: rep_tid {} awaiting update from {}",
rep_tid, log_update.waiting_on);
- result = log_update.all_committed.get_shared_future().then(
- [this, last_complete, rep_tid, version] {
+ result = interruptor::make_interruptible(
+ log_update.all_committed.get_shared_future()
+ ).then_interruptible([this, last_complete, rep_tid, version] {
logger().debug("complete_error_log: rep_tid {} awaited ", rep_tid);
peering_state.complete_write(version, last_complete);
ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
return result;
}
-seastar::future<std::optional<eversion_t>> PG::submit_error_log(
+PG::interruptible_future<std::optional<eversion_t>> PG::submit_error_log(
Ref<MOSDOp> m,
const OpInfo &op_info,
ObjectContextRef obc,
return seastar::do_with(log_entries, set<pg_shard_t>{},
[this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable {
- return seastar::do_for_each(get_acting_recovery_backfill(),
+ return interruptor::do_for_each(get_acting_recovery_backfill(),
[this, log_entries, waiting_on, rep_tid]
(auto& i) mutable {
pg_shard_t peer(i);
return shard_services.send_to_osd(peer.osd,
std::move(log_m),
get_osdmap_epoch());
- }).then([this, waiting_on, t=std::move(t), rep_tid] () mutable {
+ }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable {
waiting_on.insert(pg_whoami);
logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
log_entry_update_waiting_on.insert(