From 1716224760d1be84c69ea22133a397617b3c4b6b Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Sat, 14 Sep 2024 10:05:22 +0800 Subject: [PATCH] crimson/osd/pg: make "PG::submit_error_log()" and "PG::complete_error_log" interruptible Fixes: https://tracker.ceph.com/issues/67800 Signed-off-by: Xuehan Xu --- src/crimson/osd/pg.cc | 23 ++++++++++++----------- src/crimson/osd/pg.h | 4 ++-- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 80879279b45be..2fe116cbe7237 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1087,7 +1087,7 @@ PG::do_osd_ops_execute( rollbacker.rollback_obc_if_modified(e); // record error log auto maybe_submit_error_log = - seastar::make_ready_future>(std::nullopt); + interruptor::make_ready_future>(std::nullopt); // call submit_error_log only for non-internal clients if constexpr (!std::is_same_v) { if(op_info.may_write()) { @@ -1095,13 +1095,13 @@ PG::do_osd_ops_execute( submit_error_log(m, op_info, obc, e, rep_tid); } } - return maybe_submit_error_log.then( + return maybe_submit_error_log.then_interruptible( [this, failure_func_ptr, e, rep_tid] (auto version) { auto all_completed = [this, failure_func_ptr, e, rep_tid, version] { if (version.has_value()) { - return complete_error_log(rep_tid, version.value()).then( - [failure_func_ptr, e] { + return complete_error_log(rep_tid, version.value() + ).then_interruptible([failure_func_ptr, e] { return (*failure_func_ptr)(e); }); } else { @@ -1116,10 +1116,10 @@ PG::do_osd_ops_execute( })); } -seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid, +PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version) { - auto result = seastar::now(); + auto result = interruptor::now(); auto last_complete = peering_state.get_info().last_complete; ceph_assert(log_entry_update_waiting_on.contains(rep_tid)); auto& log_update = log_entry_update_waiting_on[rep_tid]; @@ -1133,8 +1133,9 @@ seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid, } else { logger().debug("complete_error_log: rep_tid {} awaiting update from {}", rep_tid, log_update.waiting_on); - result = log_update.all_committed.get_shared_future().then( - [this, last_complete, rep_tid, version] { + result = interruptor::make_interruptible( + log_update.all_committed.get_shared_future() + ).then_interruptible([this, last_complete, rep_tid, version] { logger().debug("complete_error_log: rep_tid {} awaited ", rep_tid); peering_state.complete_write(version, last_complete); ceph_assert(!log_entry_update_waiting_on.contains(rep_tid)); @@ -1144,7 +1145,7 @@ seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid, return result; } -seastar::future> PG::submit_error_log( +PG::interruptible_future> PG::submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, @@ -1175,7 +1176,7 @@ seastar::future> PG::submit_error_log( return seastar::do_with(log_entries, set{}, [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable { - return seastar::do_for_each(get_acting_recovery_backfill(), + return interruptor::do_for_each(get_acting_recovery_backfill(), [this, log_entries, waiting_on, rep_tid] (auto& i) mutable { pg_shard_t peer(i); @@ -1200,7 +1201,7 @@ seastar::future> PG::submit_error_log( return shard_services.send_to_osd(peer.osd, std::move(log_m), get_osdmap_epoch()); - }).then([this, waiting_on, t=std::move(t), rep_tid] () mutable { + }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable { waiting_on.insert(pg_whoami); logger().debug("submit_error_log: inserting rep_tid {}", rep_tid); log_entry_update_waiting_on.insert( diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 11c0e3668b142..93279a18c565b 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -619,9 +619,9 @@ public: void print(std::ostream& os) const; void dump_primary(Formatter*); - seastar::future<> complete_error_log(const ceph_tid_t& rep_tid, + interruptible_future<> complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version); - seastar::future> submit_error_log( + interruptible_future> submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, -- 2.39.5