]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg: make "PG::submit_error_log()" and 59543/head
authorXuehan Xu <xuxuehan@qianxin.com>
Sat, 14 Sep 2024 02:05:22 +0000 (10:05 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Sat, 14 Sep 2024 02:05:22 +0000 (10:05 +0800)
"PG::complete_error_log" interruptible

Fixes: https://tracker.ceph.com/issues/67800
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 80879279b45be91d93aa75c4bb0685c2c060f7d3..2fe116cbe7237b699d1b5127865249900eb12537 100644 (file)
@@ -1087,7 +1087,7 @@ PG::do_osd_ops_execute(
     rollbacker.rollback_obc_if_modified(e);
     // record error log
     auto maybe_submit_error_log =
-      seastar::make_ready_future<std::optional<eversion_t>>(std::nullopt);
+      interruptor::make_ready_future<std::optional<eversion_t>>(std::nullopt);
     // call submit_error_log only for non-internal clients
     if constexpr (!std::is_same_v<Ret, void>) {
       if(op_info.may_write()) {
@@ -1095,13 +1095,13 @@ PG::do_osd_ops_execute(
           submit_error_log(m, op_info, obc, e, rep_tid);
       }
     }
-    return maybe_submit_error_log.then(
+    return maybe_submit_error_log.then_interruptible(
     [this, failure_func_ptr, e, rep_tid] (auto version) {
       auto all_completed =
       [this, failure_func_ptr, e, rep_tid,  version] {
         if (version.has_value()) {
-          return complete_error_log(rep_tid, version.value()).then(
-          [failure_func_ptr, e] {
+          return complete_error_log(rep_tid, version.value()
+          ).then_interruptible([failure_func_ptr, e] {
             return (*failure_func_ptr)(e);
           });
         } else {
@@ -1116,10 +1116,10 @@ PG::do_osd_ops_execute(
   }));
 }
 
-seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
+PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
                                          const eversion_t& version)
 {
-  auto result = seastar::now();
+  auto result = interruptor::now();
   auto last_complete = peering_state.get_info().last_complete;
   ceph_assert(log_entry_update_waiting_on.contains(rep_tid));
   auto& log_update = log_entry_update_waiting_on[rep_tid];
@@ -1133,8 +1133,9 @@ seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
   } else {
     logger().debug("complete_error_log: rep_tid {} awaiting update from {}",
                    rep_tid, log_update.waiting_on);
-    result = log_update.all_committed.get_shared_future().then(
-    [this, last_complete, rep_tid, version] {
+    result = interruptor::make_interruptible(
+      log_update.all_committed.get_shared_future()
+    ).then_interruptible([this, last_complete, rep_tid, version] {
       logger().debug("complete_error_log: rep_tid {} awaited ", rep_tid);
       peering_state.complete_write(version, last_complete);
       ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
@@ -1144,7 +1145,7 @@ seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
   return result;
 }
 
-seastar::future<std::optional<eversion_t>> PG::submit_error_log(
+PG::interruptible_future<std::optional<eversion_t>> PG::submit_error_log(
   Ref<MOSDOp> m,
   const OpInfo &op_info,
   ObjectContextRef obc,
@@ -1175,7 +1176,7 @@ seastar::future<std::optional<eversion_t>> PG::submit_error_log(
 
   return seastar::do_with(log_entries, set<pg_shard_t>{},
     [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable {
-    return seastar::do_for_each(get_acting_recovery_backfill(),
+    return interruptor::do_for_each(get_acting_recovery_backfill(),
       [this, log_entries, waiting_on, rep_tid]
       (auto& i) mutable {
       pg_shard_t peer(i);
@@ -1200,7 +1201,7 @@ seastar::future<std::optional<eversion_t>> PG::submit_error_log(
       return shard_services.send_to_osd(peer.osd,
                                         std::move(log_m),
                                         get_osdmap_epoch());
-    }).then([this, waiting_on, t=std::move(t), rep_tid] () mutable {
+    }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable {
       waiting_on.insert(pg_whoami);
       logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
       log_entry_update_waiting_on.insert(
index 11c0e3668b142c840b8df7e67a72a308fc5d9566..93279a18c565bd53f1d32d91d30ee95a1ba7c757 100644 (file)
@@ -619,9 +619,9 @@ public:
 
   void print(std::ostream& os) const;
   void dump_primary(Formatter*);
-  seastar::future<> complete_error_log(const ceph_tid_t& rep_tid,
+  interruptible_future<> complete_error_log(const ceph_tid_t& rep_tid,
                                        const eversion_t& version);
-  seastar::future<std::optional<eversion_t>> submit_error_log(
+  interruptible_future<std::optional<eversion_t>> submit_error_log(
     Ref<MOSDOp> m,
     const OpInfo &op_info,
     ObjectContextRef obc,