]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: let prep_push() return a map of PushOp
authorKefu Chai <kchai@redhat.com>
Mon, 14 Dec 2020 16:38:05 +0000 (00:38 +0800)
committerKefu Chai <kchai@redhat.com>
Mon, 14 Dec 2020 17:47:18 +0000 (01:47 +0800)
ReplicatedRecoveryBackend::prep_push() is responsible for building
PushOps, so it's more natural to let it return a map of PushOp.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/replicated_recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.h

index 66cc0513ccc2f691f22dc14eb99f56295d3855ba..3e81ea76fff7687123cc193025d1411dce0a7830 100644 (file)
@@ -27,10 +27,9 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
   assert(is_recovering(soid));
   // start tracking the recovery of soid
   return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
-    return seastar::do_with(std::map<pg_shard_t, PushOp>(),
-                            get_shards_to_push(soid),
-      [this, soid, need](auto& pops, auto& shards) {
-        return maybe_push_shards(soid, need, pops, shards);
+    return seastar::do_with(get_shards_to_push(soid),
+      [this, soid, need](auto& shards) {
+        return maybe_push_shards(soid, need, shards);
       });
   });
 }
@@ -39,50 +38,53 @@ seastar::future<>
 ReplicatedRecoveryBackend::maybe_push_shards(
   const hobject_t& soid,
   eversion_t need,
-  std::map<pg_shard_t, PushOp>& pops,
   std::vector<pg_shard_t>& shards)
 {
-  auto push_func = [this, soid, need, &pops, &shards] {
-    auto fut = seastar::now();
-    if (!shards.empty())
-      fut = prep_push(soid, need, &pops, shards);
-    return fut.then([this, &pops, &shards, soid] {
-      return seastar::parallel_for_each(shards,
-       [this, &pops, soid](auto shard) {
-       auto msg = make_message<MOSDPGPush>();
-       msg->from = pg.get_pg_whoami();
-       msg->pgid = pg.get_pgid();
-       msg->map_epoch = pg.get_osdmap_epoch();
-       msg->min_epoch = pg.get_last_peering_reset();
-       msg->set_priority(pg.get_recovery_op_priority());
-       msg->pushes.push_back(pops[shard]);
-       return shard_services.send_to_osd(shard.osd, std::move(msg),
-                                      pg.get_osdmap_epoch()).then(
-         [this, soid, shard] {
-         return recovering.at(soid).wait_for_pushes(shard);
-       });
-      });
-    }).then([this, soid] {
-      auto& recovery = recovering.at(soid);
+  auto push_func = [this, soid, need, &shards] {
+    auto prepare_pops = seastar::now();
+    if (!shards.empty()) {
+      prepare_pops =
+        prep_push(soid, need, shards).then([this, &shards, soid](auto pops) {
+          return seastar::parallel_for_each(shards,
+            [this, &pops, soid](auto shard) {
+            auto msg = make_message<MOSDPGPush>();
+            msg->from = pg.get_pg_whoami();
+            msg->pgid = pg.get_pgid();
+            msg->map_epoch = pg.get_osdmap_epoch();
+            msg->min_epoch = pg.get_last_peering_reset();
+            msg->pushes.push_back(pops[shard]);
+            msg->set_priority(pg.get_recovery_op_priority());
+            return shard_services.send_to_osd(shard.osd,
+                                              std::move(msg),
+                                              pg.get_osdmap_epoch()).then(
+             [this, soid, shard] {
+             return recovering.at(soid).wait_for_pushes(shard);
+           });
+         });
+       });
+    }
+    return prepare_pops.then([this, soid] {
+      auto &recovery = recovering.at(soid);
       auto push_info = recovery.pushing.begin();
       object_stat_sum_t stat = {};
       if (push_info != recovery.pushing.end()) {
-       stat = push_info->second.stat;
+        stat = push_info->second.stat;
       } else {
-       // no push happened, take pull_info's stat
-       assert(recovery.pi);
-       stat = recovery.pi->stat;
+        // no push happened, take pull_info's stat
+        assert(recovery.pi);
+        stat = recovery.pi->stat;
       }
       pg.get_recovery_handler()->on_global_recover(soid, stat, false);
       return seastar::make_ready_future<>();
     }).handle_exception([this, soid](auto e) {
-      auto& recovery = recovering.at(soid);
-      if (recovery.obc)
-       recovery.obc->drop_recovery_read();
+      auto &recovery = recovering.at(soid);
+      if (recovery.obc) {
+        recovery.obc->drop_recovery_read();
+      }
       recovering.erase(soid);
       return seastar::make_exception_future<>(e);
     });
-  };
+  }; // push_func
 
   auto& recovery_waiter = recovering.at(soid);
   if (recovery_waiter.obc) {
@@ -280,19 +282,21 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete(
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::prep_push(
+seastar::future<std::map<pg_shard_t, PushOp>>
+ReplicatedRecoveryBackend::prep_push(
   const hobject_t& soid,
   eversion_t need,
-  std::map<pg_shard_t, PushOp>* pops,
   const std::vector<pg_shard_t>& shards)
 {
   logger().debug("{}: {}, {}", __func__, soid, need);
 
-  return seastar::do_with(std::map<pg_shard_t, interval_set<uint64_t>>(),
-    [this, soid, pops, &shards](auto& data_subsets) {
+  return seastar::do_with(std::map<pg_shard_t, PushOp>(),
+                         std::map<pg_shard_t, interval_set<uint64_t>>(),
+    [this, soid, &shards](auto& pops,
+                          auto& data_subsets) {
     return seastar::parallel_for_each(shards,
       [this, soid, pops, &data_subsets](auto pg_shard) mutable {
-      pops->emplace(pg_shard, PushOp());
+      pops.emplace(pg_shard, PushOp());
       auto& recovery_waiter = recovering.at(soid);
       auto& obc = recovery_waiter.obc;
       auto& data_subset = data_subsets[pg_shard];
@@ -328,13 +332,15 @@ seastar::future<> ReplicatedRecoveryBackend::prep_push(
        HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
 
       return build_push_op(pi.recovery_info, pi.recovery_progress,
-                          &pi.stat, &(*pops)[pg_shard]).then(
+                          &pi.stat, &pops[pg_shard]).then(
        [this, soid, pg_shard](auto new_progress) {
        auto& recovery_waiter = recovering.at(soid);
        auto& pi = recovery_waiter.pushing[pg_shard];
        pi.recovery_progress = new_progress;
        return seastar::make_ready_future<>();
       });
+    }).then([&pops]() mutable {
+      return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(pops));
     });
   });
 }
index 4646d78bb5f40a9c818858d08d1da0575cbf3e41..5017558012df1ca20f57a3d3b81d8b5d39d8433c 100644 (file)
@@ -44,10 +44,9 @@ protected:
     Ref<MOSDPGRecoveryDelete> m);
   seastar::future<> handle_recovery_delete_reply(
     Ref<MOSDPGRecoveryDeleteReply> m);
-  seastar::future<> prep_push(
+  seastar::future<std::map<pg_shard_t, PushOp>> prep_push(
     const hobject_t& soid,
     eversion_t need,
-    std::map<pg_shard_t, PushOp>* pops,
     const std::vector<pg_shard_t>& shards);
   void prepare_pull(
     PullOp& po,
@@ -121,7 +120,6 @@ private:
   seastar::future<> maybe_push_shards(
     const hobject_t& soid,
     eversion_t need,
-    std::map<pg_shard_t, PushOp>& pops,
     std::vector<pg_shard_t>& shards);
 
   /// read the remaining extents of object to be recovered and fill push_op