]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/recovery_backend: restart object pulling for recoveries that
authorXuehan Xu <xuxuehan@qianxin.com>
Tue, 13 Aug 2024 07:32:02 +0000 (15:32 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Tue, 27 Aug 2024 02:12:50 +0000 (10:12 +0800)
are blocked pulling from down osds

Fixes: https://tracker.ceph.com/issues/67508
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/osd/pg.h
src/crimson/osd/recovery_backend.h
src/crimson/osd/replicated_recovery_backend.cc

index 58e3db938f67996332fcdb434676ba243d705711..d4d6d507110c87f0f7ed933f664168125a87f383 100644 (file)
@@ -357,7 +357,13 @@ public:
     shard_services.remove_want_pg_temp(orderer, pgid.pgid);
   }
   void check_recovery_sources(const OSDMapRef& newmap) final {
-    // Not needed yet
+    recovery_backend->for_each_recovery_waiter(
+      [newmap, FNAME](auto &, auto &waiter) {
+        if (waiter->is_pulling() &&
+            newmap->is_down(waiter->pull_info->from.osd)) {
+          waiter->repeat_pull();
+        }
+      });
   }
   void check_blocklisted_watchers() final;
   void clear_primary_state() final {
index b404b79751e7b1e30ff6cab2c270c6c0c05e0045..21154cb710679f5ba329db5056eede8bc56b8241 100644 (file)
@@ -112,6 +112,13 @@ public:
     }
     return on_stop();
   }
+
+  template <typename Func>
+  void for_each_recovery_waiter(Func &&f) {
+    for (auto &[soid, recovery_waiter] : recovering) {
+      std::forward<Func>(f)(soid, recovery_waiter);
+    }
+  }
 protected:
   crimson::osd::PG& pg;
   crimson::osd::ShardServices& shard_services;
@@ -219,6 +226,13 @@ public:
        pulled.reset();
       }
     }
+    void repeat_pull() {
+      ceph_assert(pulled);
+      pulled->set_exception(crimson::ct_error::eagain::exception_ptr());
+    }
+    bool is_pulling() const {
+      return (bool)pulled;
+    }
     void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
       auto it = pushes.find(shard);
       if (it != pushes.end()) {
index f59d2f1757d0eb54ba8e7c973b095de4fc59ce30..76f24196b51f75e919e42975a9b46fa5a3a1ed7e 100644 (file)
@@ -113,28 +113,33 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
     // object is not missing, don't pull
     return seastar::make_ready_future<>();
   }
-  return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
-    [this, soid, need](auto head, auto) {
-    PullOp pull_op;
-    auto& recovery_waiter = get_recovering(soid);
-    recovery_waiter.pull_info =
-      std::make_optional<RecoveryBackend::pull_info_t>();
-    auto& pull_info = *recovery_waiter.pull_info;
-    prepare_pull(head, pull_op, pull_info, soid, need);
-    auto msg = crimson::make_message<MOSDPGPull>();
-    msg->from = pg.get_pg_whoami();
-    msg->set_priority(pg.get_recovery_op_priority());
-    msg->pgid = pg.get_pgid();
-    msg->map_epoch = pg.get_osdmap_epoch();
-    msg->min_epoch = pg.get_last_peering_reset();
-    msg->set_pulls({std::move(pull_op)});
-    return shard_services.send_to_osd(
-      pull_info.from.osd,
-      std::move(msg),
-      pg.get_osdmap_epoch());
-  }).si_then([this, soid] {
-    auto& recovery_waiter = get_recovering(soid);
-    return recovery_waiter.wait_for_pull();
+  return interruptor::repeat_eagain([this, soid, need] {
+    using prepare_pull_iertr =
+      crimson::osd::ObjectContextLoader::load_obc_iertr::extend<
+        crimson::ct_error::eagain>;
+    return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
+      [this, soid, need](auto head, auto) {
+      PullOp pull_op;
+      auto& recovery_waiter = get_recovering(soid);
+      recovery_waiter.pull_info =
+        std::make_optional<RecoveryBackend::pull_info_t>();
+      auto& pull_info = *recovery_waiter.pull_info;
+      prepare_pull(head, pull_op, pull_info, soid, need);
+      auto msg = crimson::make_message<MOSDPGPull>();
+      msg->from = pg.get_pg_whoami();
+      msg->set_priority(pg.get_recovery_op_priority());
+      msg->pgid = pg.get_pgid();
+      msg->map_epoch = pg.get_osdmap_epoch();
+      msg->min_epoch = pg.get_last_peering_reset();
+      msg->set_pulls({std::move(pull_op)});
+      return shard_services.send_to_osd(
+        pull_info.from.osd,
+        std::move(msg),
+        pg.get_osdmap_epoch());
+    }).si_then([this, soid]() -> prepare_pull_iertr::future<> {
+      auto& recovery_waiter = get_recovering(soid);
+      return recovery_waiter.wait_for_pull();
+    });
   }).handle_error_interruptible(
     crimson::ct_error::assert_all("unexpected error")
   );