]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: let build_push_op() return PushOp
authorKefu Chai <kchai@redhat.com>
Mon, 14 Dec 2020 17:32:08 +0000 (01:32 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 15 Dec 2020 01:01:44 +0000 (09:01 +0800)
for better readability

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/replicated_recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.h

index 3e81ea76fff7687123cc193025d1411dce0a7830..9a1fdc16fcb019bbc189d6b83fbead9904f51441 100644 (file)
@@ -332,11 +332,12 @@ ReplicatedRecoveryBackend::prep_push(
        HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
 
       return build_push_op(pi.recovery_info, pi.recovery_progress,
-                          &pi.stat, &pops[pg_shard]).then(
-       [this, soid, pg_shard](auto new_progress) {
+                           &pi.stat).then(
+        [this, soid, pg_shard, &pops](auto pop) {
        auto& recovery_waiter = recovering.at(soid);
        auto& pi = recovery_waiter.pushing[pg_shard];
-       pi.recovery_progress = new_progress;
+       pi.recovery_progress = pop.after_progress;
+       pops[pg_shard] = std::move(pop);
        return seastar::make_ready_future<>();
       });
     }).then([&pops]() mutable {
@@ -378,12 +379,11 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi,
   pi.recovery_progress = po.recovery_progress;
 }
 
-seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op(
+seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
     const ObjectRecoveryInfo& recovery_info,
     const ObjectRecoveryProgress& progress,
-    object_stat_sum_t* stat,
-    PushOp* pop
-  ) {
+    object_stat_sum_t* stat)
+{
   logger().debug("{} {} @{}",
                 __func__, recovery_info.soid, recovery_info.version);
   return seastar::do_with(ObjectRecoveryProgress(progress),
@@ -391,9 +391,10 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
                          uint64_t(crimson::common::local_conf()
                            ->osd_recovery_max_chunk),
                          eversion_t(),
-    [this, &recovery_info, &progress, stat, pop]
-    (auto& new_progress, auto& oi, auto& available, auto& v) {
-    return [this, &recovery_info, &progress, &new_progress, &oi, pop, &v] {
+                         PushOp(),
+    [this, &recovery_info, &progress, stat]
+    (auto& new_progress, auto& oi, auto& available, auto& v, auto& pop) {
+    return [this, &recovery_info, &progress, &new_progress, &oi, &v, pop=&pop] {
       v = recovery_info.version;
       if (progress.first) {
        return backend->omap_get_header(coll, ghobject_t(recovery_info.soid))
@@ -419,20 +420,20 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
        );
       }
       return seastar::make_ready_future<>();
-    }().then([this, &recovery_info, &progress, &new_progress, &available, pop] {
+    }().then([this, &recovery_info, &progress, &new_progress, &available, &pop]() mutable {
       return read_omap_for_push_op(recovery_info.soid,
                                    progress,
                                    new_progress,
-                                   available, pop);
-    }).then([this, &recovery_info, &progress, &available, pop] {
+                                   available, &pop);
+    }).then([this, &recovery_info, &progress, &available, &pop]() mutable {
       logger().debug("build_push_op: available: {}, copy_subset: {}",
                     available, recovery_info.copy_subset);
       return read_object_for_push_op(recovery_info.soid,
                                     recovery_info.copy_subset,
                                     progress.data_recovered_to,
-                                    available, pop);
-    }).then([&recovery_info, &v, &progress, &new_progress, stat, pop]
-            (uint64_t recovered_to) {
+                                    available, &pop);
+    }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop]
+            (uint64_t recovered_to) mutable {
       new_progress.data_recovered_to = recovered_to;
       if (new_progress.is_complete(recovery_info)) {
        new_progress.data_complete = true;
@@ -444,18 +445,17 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
        new_progress.omap_complete = false;
       }
       if (stat) {
-       stat->num_keys_recovered += pop->omap_entries.size();
-       stat->num_bytes_recovered += pop->data.length();
+       stat->num_keys_recovered += pop.omap_entries.size();
+       stat->num_bytes_recovered += pop.data.length();
       }
-      pop->version = v;
-      pop->soid = recovery_info.soid;
-      pop->recovery_info = recovery_info;
-      pop->after_progress = new_progress;
-      pop->before_progress = progress;
+      pop.version = v;
+      pop.soid = recovery_info.soid;
+      pop.recovery_info = recovery_info;
+      pop.after_progress = new_progress;
+      pop.before_progress = progress;
       logger().debug("build_push_op: pop version: {}, pop data length: {}",
-                    pop->version, pop->data.length());
-      return seastar::make_ready_future<ObjectRecoveryProgress>
-               (std::move(new_progress));
+                    pop.version, pop.data.length());
+      return seastar::make_ready_future<PushOp>(std::move(pop));
     });
   });
 }
@@ -586,42 +586,34 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
     [this, m, from = m->from](auto& pulls) {
     return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) {
       const hobject_t& soid = pull_op.soid;
-      return seastar::do_with(PushOp(),
-       [this, &soid, &pull_op, from](auto& pop) {
-       logger().debug("handle_pull: {}", soid);
-       return backend->stat(coll, ghobject_t(soid)).then(
-         [this, &pull_op, &pop](auto st) {
-         ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
-         ObjectRecoveryProgress &progress = pull_op.recovery_progress;
-         if (progress.first && recovery_info.size == ((uint64_t) -1)) {
-           // Adjust size and copy_subset
-           recovery_info.size = st.st_size;
-           if (st.st_size) {
-             interval_set<uint64_t> object_range;
-             object_range.insert(0, st.st_size);
-             recovery_info.copy_subset.intersection_of(object_range);
-           } else {
-             recovery_info.copy_subset.clear();
-           }
-           assert(recovery_info.clone_subset.empty());
-         }
-         return build_push_op(recovery_info, progress, 0, &pop);
-       }).handle_exception([soid, &pop](auto e) {
-         pop.recovery_info.version = eversion_t();
-         pop.version = eversion_t();
-         pop.soid = soid;
-         return seastar::make_ready_future<ObjectRecoveryProgress>();
-       }).then([this, &pop, from](auto new_progress) {
-         auto msg = make_message<MOSDPGPush>();
-         msg->from = pg.get_pg_whoami();
-         msg->pgid = pg.get_pgid();
-         msg->map_epoch = pg.get_osdmap_epoch();
-         msg->min_epoch = pg.get_last_peering_reset();
-         msg->set_priority(pg.get_recovery_op_priority());
-         msg->pushes.push_back(pop);
-         return shard_services.send_to_osd(from.osd, std::move(msg),
-                                           pg.get_osdmap_epoch());
-       });
+      logger().debug("handle_pull: {}", soid);
+      return backend->stat(coll, ghobject_t(soid)).then(
+        [this, &pull_op](auto st) {
+        ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
+        ObjectRecoveryProgress &progress = pull_op.recovery_progress;
+        if (progress.first && recovery_info.size == ((uint64_t) -1)) {
+          // Adjust size and copy_subset
+          recovery_info.size = st.st_size;
+          if (st.st_size) {
+            interval_set<uint64_t> object_range;
+            object_range.insert(0, st.st_size);
+            recovery_info.copy_subset.intersection_of(object_range);
+          } else {
+            recovery_info.copy_subset.clear();
+          }
+          assert(recovery_info.clone_subset.empty());
+        }
+        return build_push_op(recovery_info, progress, 0);
+      }).then([this, from](auto pop) {
+        auto msg = make_message<MOSDPGPush>();
+        msg->from = pg.get_pg_whoami();
+        msg->pgid = pg.get_pgid();
+        msg->map_epoch = pg.get_osdmap_epoch();
+        msg->min_epoch = pg.get_last_peering_reset();
+        msg->set_priority(pg.get_recovery_op_priority());
+        msg->pushes.push_back(std::move(pop));
+        return shard_services.send_to_osd(from.osd, std::move(msg),
+                                          pg.get_osdmap_epoch());
       });
     });
   });
@@ -827,10 +819,10 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push(
   });
 }
 
-seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
+seastar::future<std::optional<PushOp>>
+ReplicatedRecoveryBackend::_handle_push_reply(
   pg_shard_t peer,
-  const PushReplyOp &op,
-  PushOp *reply)
+  const PushReplyOp &op)
 {
   const hobject_t& soid = op.soid;
   logger().debug("{}, soid {}, from {}", __func__, soid, peer);
@@ -838,27 +830,26 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
   if (recovering_iter == recovering.end()
       || !recovering_iter->second.pushing.count(peer)) {
     logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
-    return seastar::make_ready_future<bool>(true);
+    return seastar::make_ready_future<std::optional<PushOp>>();
   } else {
     auto& pi = recovering_iter->second.pushing[peer];
-    return [this, &pi, &soid, reply, peer, recovering_iter] {
-      bool error = pi.recovery_progress.error;
-      if (!pi.recovery_progress.data_complete && !error) {
-       return build_push_op(pi.recovery_info, pi.recovery_progress,
-           &pi.stat, reply).then([&pi] (auto new_progress) {
-         pi.recovery_progress = new_progress;
-         return seastar::make_ready_future<bool>(false);
-       });
-      }
-      if (!error)
-       pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
-      recovering_iter->second.set_pushed(peer);
-      return seastar::make_ready_future<bool>(true);
-    }().handle_exception([recovering_iter, &pi, peer] (auto e) {
-      pi.recovery_progress.error = true;
-      recovering_iter->second.set_push_failed(peer, e);
-      return seastar::make_ready_future<bool>(true);
-    });
+    bool error = pi.recovery_progress.error;
+    if (!pi.recovery_progress.data_complete && !error) {
+      return build_push_op(pi.recovery_info, pi.recovery_progress,
+                          &pi.stat).then([&pi] (auto pop) {
+        pi.recovery_progress = pop.after_progress;
+       return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
+      }).handle_exception([recovering_iter, &pi, peer] (auto e) {
+        pi.recovery_progress.error = true;
+        recovering_iter->second.set_push_failed(peer, e);
+        return seastar::make_ready_future<std::optional<PushOp>>();
+      });
+    }
+    if (!error) {
+      pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
+    }
+    recovering_iter->second.set_pushed(peer);
+    return seastar::make_ready_future<std::optional<PushOp>>();
   }
 }
 
@@ -869,21 +860,22 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push_reply(
   auto from = m->from;
   auto& push_reply = m->replies[0]; //TODO: only one reply per message
 
-  return seastar::do_with(PushOp(), [this, &push_reply, from](auto& pop) {
-    return _handle_push_reply(from, push_reply, &pop).then(
-      [this, &pop, from](bool finished) {
-      if (!finished) {
-       auto msg = make_message<MOSDPGPush>();
-       msg->from = pg.get_pg_whoami();
-       msg->pgid = pg.get_pgid();
-       msg->map_epoch = pg.get_osdmap_epoch();
-       msg->min_epoch = pg.get_last_peering_reset();
-       msg->set_priority(pg.get_recovery_op_priority());
-       msg->pushes.push_back(pop);
-       return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch());
-      }
+  return _handle_push_reply(from, push_reply).then(
+    [this, from](std::optional<PushOp> push_op) {
+    if (push_op) {
+      auto msg = make_message<MOSDPGPush>();
+      msg->from = pg.get_pg_whoami();
+      msg->pgid = pg.get_pgid();
+      msg->map_epoch = pg.get_osdmap_epoch();
+      msg->min_epoch = pg.get_last_peering_reset();
+      msg->set_priority(pg.get_recovery_op_priority());
+      msg->pushes.push_back(std::move(*push_op));
+      return shard_services.send_to_osd(from.osd,
+                                        std::move(msg),
+                                        pg.get_osdmap_epoch());
+    } else {
       return seastar::make_ready_future<>();
-    });
+    }
   });
 }
 
index 5017558012df1ca20f57a3d3b81d8b5d39d8433c..8298db642d101059f903f208370b1499e529e416 100644 (file)
@@ -55,11 +55,10 @@ protected:
     eversion_t need);
   std::vector<pg_shard_t> get_shards_to_push(
     const hobject_t& soid) const;
-  seastar::future<ObjectRecoveryProgress> build_push_op(
+  seastar::future<PushOp> build_push_op(
     const ObjectRecoveryInfo& recovery_info,
     const ObjectRecoveryProgress& progress,
-    object_stat_sum_t* stat,
-    PushOp* pop);
+    object_stat_sum_t* stat);
   seastar::future<bool> _handle_pull_response(
     pg_shard_t from,
     PushOp& pop,
@@ -91,10 +90,9 @@ protected:
     const PushOp &pop,
     PushReplyOp *response,
     ceph::os::Transaction *t);
-  seastar::future<bool> _handle_push_reply(
+  seastar::future<std::optional<PushOp>> _handle_push_reply(
     pg_shard_t peer,
-    const PushReplyOp &op,
-    PushOp *reply);
+    const PushReplyOp &op);
   seastar::future<> on_local_recover_persist(
     const hobject_t& soid,
     const ObjectRecoveryInfo& _recovery_info,