]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: do not use the parallel_for_each() in prep_push()
authorKefu Chai <kchai@redhat.com>
Thu, 17 Dec 2020 03:29:02 +0000 (11:29 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 17 Dec 2020 11:35:32 +0000 (19:35 +0800)
the caller of prep_push() also loops in target pg shards, so there is no
need to do the loop in prep_push(). in this change, prep_push() just
takes care of a single pg_shard for a single oid instead of collecting
all push ops for all pg_shards to be recovered.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/replicated_recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.h

index 8173489055d2e4a819b7814a1c4b8208a2e681e0..540ba5a46008ad07f5e629441cd6f372a90bf5c4 100644 (file)
@@ -43,25 +43,23 @@ ReplicatedRecoveryBackend::maybe_push_shards(
   auto push_func = [this, soid, need, &shards] {
     auto prepare_pops = seastar::now();
     if (!shards.empty()) {
-      prepare_pops =
-        prep_push(soid, need, shards).then([this, &shards, soid](auto pops) {
-          return seastar::parallel_for_each(shards,
-            [this, &pops, soid](auto shard) {
-            auto msg = make_message<MOSDPGPush>();
-            msg->from = pg.get_pg_whoami();
-            msg->pgid = pg.get_pgid();
-            msg->map_epoch = pg.get_osdmap_epoch();
-            msg->min_epoch = pg.get_last_peering_reset();
-            msg->pushes.push_back(pops[shard]);
-            msg->set_priority(pg.get_recovery_op_priority());
-            return shard_services.send_to_osd(shard.osd,
-                                              std::move(msg),
-                                              pg.get_osdmap_epoch()).then(
-             [this, soid, shard] {
-             return recovering.at(soid).wait_for_pushes(shard);
-           });
+      prepare_pops = seastar::parallel_for_each(shards, [this, need, soid](auto shard) {
+       return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
+          auto msg = make_message<MOSDPGPush>();
+          msg->from = pg.get_pg_whoami();
+          msg->pgid = pg.get_pgid();
+          msg->map_epoch = pg.get_osdmap_epoch();
+          msg->min_epoch = pg.get_last_peering_reset();
+          msg->pushes.push_back(std::move(push));
+          msg->set_priority(pg.get_recovery_op_priority());
+          return shard_services.send_to_osd(shard.osd,
+                                           std::move(msg),
+                                           pg.get_osdmap_epoch()).then(
+           [this, soid, shard] {
+           return recovering.at(soid).wait_for_pushes(shard);
          });
        });
+     });
     }
     return prepare_pops.then([this, soid] {
       auto &recovery = recovering.at(soid);
@@ -280,60 +278,53 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete(
   });
 }
 
-seastar::future<std::map<pg_shard_t, PushOp>>
+seastar::future<PushOp>
 ReplicatedRecoveryBackend::prep_push(
   const hobject_t& soid,
   eversion_t need,
-  const std::vector<pg_shard_t>& shards)
+  pg_shard_t pg_shard)
 {
   logger().debug("{}: {}, {}", __func__, soid, need);
 
-  auto pops = seastar::make_lw_shared<std::map<pg_shard_t, PushOp>>();
-  return seastar::parallel_for_each(shards,
-    [this, soid, pops](auto pg_shard) mutable {
-    auto& recovery_waiter = recovering.at(soid);
-    auto& obc = recovery_waiter.obc;
-    interval_set<uint64_t> data_subset;
-    if (obc->obs.oi.size) {
-      data_subset.insert(0, obc->obs.oi.size);
-    }
-    const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
-    if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
-      const auto it = missing.get_items().find(soid);
-      assert(it != missing.get_items().end());
-      data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
-      logger().debug("prep_push: {} data_subset {}", soid, data_subset);
-    }
+  auto& recovery_waiter = recovering.at(soid);
+  auto& obc = recovery_waiter.obc;
+  interval_set<uint64_t> data_subset;
+  if (obc->obs.oi.size) {
+    data_subset.insert(0, obc->obs.oi.size);
+  }
+  const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
+  if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
+    const auto it = missing.get_items().find(soid);
+    assert(it != missing.get_items().end());
+    data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+    logger().debug("prep_push: {} data_subset {}", soid, data_subset);
+  }
 
-    logger().debug("prep_push: {} to {}", soid, pg_shard);
-    auto& pi = recovery_waiter.pushing[pg_shard];
-    pg.begin_peer_recover(pg_shard, soid);
-    const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
-    const auto missing_iter = pmissing_iter->second.get_items().find(soid);
-    assert(missing_iter != pmissing_iter->second.get_items().end());
+  logger().debug("prep_push: {} to {}", soid, pg_shard);
+  auto& pi = recovery_waiter.pushing[pg_shard];
+  pg.begin_peer_recover(pg_shard, soid);
+  const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
+  const auto missing_iter = pmissing_iter->second.get_items().find(soid);
+  assert(missing_iter != pmissing_iter->second.get_items().end());
 
-    pi.obc = obc;
-    pi.recovery_info.size = obc->obs.oi.size;
-    pi.recovery_info.copy_subset = data_subset;
-    pi.recovery_info.soid = soid;
-    pi.recovery_info.oi = obc->obs.oi;
-    pi.recovery_info.version = obc->obs.oi.version;
-    pi.recovery_info.object_exist =
-      missing_iter->second.clean_regions.object_is_exist();
-    pi.recovery_progress.omap_complete =
-      (!missing_iter->second.clean_regions.omap_is_dirty() &&
-       HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
+  pi.obc = obc;
+  pi.recovery_info.size = obc->obs.oi.size;
+  pi.recovery_info.copy_subset = data_subset;
+  pi.recovery_info.soid = soid;
+  pi.recovery_info.oi = obc->obs.oi;
+  pi.recovery_info.version = obc->obs.oi.version;
+  pi.recovery_info.object_exist =
+    missing_iter->second.clean_regions.object_is_exist();
+  pi.recovery_progress.omap_complete =
+    (!missing_iter->second.clean_regions.omap_is_dirty() &&
+     HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
 
-    return build_push_op(pi.recovery_info, pi.recovery_progress,
-                        &pi.stat).then(
-      [this, soid, pg_shard, pops](auto pop) {
-      auto& recovery_waiter = recovering.at(soid);
-      auto& pi = recovery_waiter.pushing[pg_shard];
-      pi.recovery_progress = pop.after_progress;
-      pops->emplace(pg_shard, std::move(pop));
-    });
-  }).then([pops]() mutable {
-    return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(*pops));
+  return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then(
+    [this, soid, pg_shard](auto pop) {
+    auto& recovery_waiter = recovering.at(soid);
+    auto& pi = recovery_waiter.pushing[pg_shard];
+    pi.recovery_progress = pop.after_progress;
+    return pop;
   });
 }
 
index 92520aff51b95eade88655ae40ad56a2b20e8f0c..848f41fbc9cf0c4b2f888262cfdb2d94ff139a49 100644 (file)
@@ -44,10 +44,10 @@ protected:
     Ref<MOSDPGRecoveryDelete> m);
   seastar::future<> handle_recovery_delete_reply(
     Ref<MOSDPGRecoveryDeleteReply> m);
-  seastar::future<std::map<pg_shard_t, PushOp>> prep_push(
+  seastar::future<PushOp> prep_push(
     const hobject_t& soid,
     eversion_t need,
-    const std::vector<pg_shard_t>& shards);
+    pg_shard_t pg_shard);
   void prepare_pull(
     PullOp& po,
     PullInfo& pi,