]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimoson/osd/replicated_recovery_backend: Rename pop to push_op 49568/head
authorMatan Breizman <mbreizma@redhat.com>
Sun, 25 Dec 2022 12:34:51 +0000 (12:34 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 25 Dec 2022 13:14:33 +0000 (13:14 +0000)
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/osd/replicated_recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.h

index 114e1efd7dab2127f60bc461f434a26daeca98fa..b2e3dfbd6382675b3b8e9cc997a86fc31604325d 100644 (file)
@@ -332,11 +332,11 @@ ReplicatedRecoveryBackend::prep_push(
   return build_push_op(push_info.recovery_info,
                        push_info.recovery_progress,
                        &push_info.stat).then_interruptible(
-    [this, soid, pg_shard](auto pop) {
+    [this, soid, pg_shard](auto push_op) {
     auto& recovery_waiter = get_recovering(soid);
     auto& push_info = recovery_waiter.pushing[pg_shard];
-    push_info.recovery_progress = pop.after_progress;
-    return pop;
+    push_info.recovery_progress = push_op.after_progress;
+    return push_op;
   });
 }
 
@@ -386,10 +386,11 @@ ReplicatedRecoveryBackend::build_push_op(
                          recovery_info.version,
                          PushOp(),
     [this, &recovery_info, &progress, stat]
-    (auto& new_progress, auto& available, auto& v, auto& pop) {
+    (auto& new_progress, auto& available, auto& v, auto& push_op) {
     return read_metadata_for_push_op(recovery_info.soid,
                                      progress, new_progress,
-                                     v, &pop).then_interruptible([&](eversion_t local_ver) mutable {
+                                     v, &push_op
+    ).then_interruptible([&](eversion_t local_ver) mutable {
       // If requestor didn't know the version, use ours
       if (v == eversion_t()) {
         v = local_ver;
@@ -401,15 +402,17 @@ ReplicatedRecoveryBackend::build_push_op(
       return read_omap_for_push_op(recovery_info.soid,
                                    progress,
                                    new_progress,
-                                   available, &pop);
-    }).then_interruptible([this, &recovery_info, &progress, &available, &pop]() mutable {
+                                   available, &push_op);
+    }).then_interruptible([this, &recovery_info, &progress,
+                           &available, &push_op]() mutable {
       logger().debug("build_push_op: available: {}, copy_subset: {}",
                     available, recovery_info.copy_subset);
       return read_object_for_push_op(recovery_info.soid,
                                     recovery_info.copy_subset,
                                     progress.data_recovered_to,
-                                    available, &pop);
-    }).then_interruptible([&recovery_info, &v, &progress, &new_progress, stat, &pop]
+                                    available, &push_op);
+    }).then_interruptible([&recovery_info, &v, &progress,
+                           &new_progress, stat, &push_op]
             (uint64_t recovered_to) mutable {
       new_progress.data_recovered_to = recovered_to;
       if (new_progress.is_complete(recovery_info)) {
@@ -422,17 +425,18 @@ ReplicatedRecoveryBackend::build_push_op(
        new_progress.omap_complete = false;
       }
       if (stat) {
-       stat->num_keys_recovered += pop.omap_entries.size();
-       stat->num_bytes_recovered += pop.data.length();
+       stat->num_keys_recovered += push_op.omap_entries.size();
+       stat->num_bytes_recovered += push_op.data.length();
       }
-      pop.version = v;
-      pop.soid = recovery_info.soid;
-      pop.recovery_info = recovery_info;
-      pop.after_progress = new_progress;
-      pop.before_progress = progress;
-      logger().debug("build_push_op: pop version: {}, pop data length: {}",
-                    pop.version, pop.data.length());
-      return seastar::make_ready_future<PushOp>(std::move(pop));
+      push_op.version = v;
+      push_op.soid = recovery_info.soid;
+      push_op.recovery_info = recovery_info;
+      push_op.after_progress = new_progress;
+      push_op.before_progress = progress;
+      logger().debug("build_push_op: push_op version:"
+                     " {}, push_op data length: {}",
+                    push_op.version, push_op.data.length());
+      return seastar::make_ready_future<PushOp>(std::move(push_op));
     });
   });
 }
@@ -641,14 +645,14 @@ ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
           assert(recovery_info.clone_subset.empty());
         }
         return build_push_op(recovery_info, progress, 0);
-      }).then_interruptible([this, from](auto pop) {
+      }).then_interruptible([this, from](auto push_op) {
         auto msg = crimson::make_message<MOSDPGPush>();
         msg->from = pg.get_pg_whoami();
         msg->pgid = pg.get_pgid();
         msg->map_epoch = pg.get_osdmap_epoch();
         msg->min_epoch = pg.get_last_peering_reset();
         msg->set_priority(pg.get_recovery_op_priority());
-        msg->pushes.push_back(std::move(pop));
+        msg->pushes.push_back(std::move(push_op));
         return shard_services.send_to_osd(from.osd, std::move(msg),
                                           pg.get_osdmap_epoch());
       });
@@ -659,78 +663,79 @@ ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
 RecoveryBackend::interruptible_future<bool>
 ReplicatedRecoveryBackend::_handle_pull_response(
   pg_shard_t from,
-  PushOp& pop,
+  PushOp& push_op,
   PullOp* response,
   ceph::os::Transaction* t)
 {
   logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}",
-      pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included);
+      push_op.recovery_info, push_op.after_progress,
+      push_op.data.length(), push_op.data_included);
 
-  const hobject_t &hoid = pop.soid;
+  const hobject_t &hoid = push_op.soid;
   auto& recovery_waiter = get_recovering(hoid);
   auto& pull_info = *recovery_waiter.pull_info;
   if (pull_info.recovery_info.size == (uint64_t(-1))) {
-    pull_info.recovery_info.size = pop.recovery_info.size;
+    pull_info.recovery_info.size = push_op.recovery_info.size;
     pull_info.recovery_info.copy_subset.intersection_of(
-       pop.recovery_info.copy_subset);
+       push_op.recovery_info.copy_subset);
   }
 
   // If primary doesn't have object info and didn't know version
   if (pull_info.recovery_info.version == eversion_t())
-    pull_info.recovery_info.version = pop.version;
+    pull_info.recovery_info.version = push_op.version;
 
   auto prepare_waiter = interruptor::make_interruptible(
       seastar::make_ready_future<>());
   if (pull_info.recovery_progress.first) {
     prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
       pull_info.recovery_info.soid,
-      [&pull_info, &recovery_waiter, &pop](auto obc) {
+      [&pull_info, &recovery_waiter, &push_op](auto obc) {
         pull_info.obc = obc;
         recovery_waiter.obc = obc;
-        obc->obs.oi.decode_no_oid(pop.attrset.at(OI_ATTR), pop.soid);
+        obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR), push_op.soid);
         pull_info.recovery_info.oi = obc->obs.oi;
         return crimson::osd::PG::load_obc_ertr::now();
       }).handle_error_interruptible(crimson::ct_error::assert_all{});
   };
   return prepare_waiter.then_interruptible(
-    [this, &pull_info, &pop, t, response]() mutable {
+    [this, &pull_info, &push_op, t, response]() mutable {
     const bool first = pull_info.recovery_progress.first;
-    pull_info.recovery_progress = pop.after_progress;
+    pull_info.recovery_progress = push_op.after_progress;
     logger().debug("new recovery_info {}, new progress {}",
                   pull_info.recovery_info, pull_info.recovery_progress);
     interval_set<uint64_t> data_zeros;
     {
-      uint64_t offset = pop.before_progress.data_recovered_to;
-      uint64_t length = (pop.after_progress.data_recovered_to -
-                        pop.before_progress.data_recovered_to);
+      uint64_t offset = push_op.before_progress.data_recovered_to;
+      uint64_t length = (push_op.after_progress.data_recovered_to -
+                        push_op.before_progress.data_recovered_to);
       if (length) {
         data_zeros.insert(offset, length);
       }
     }
     auto [usable_intervals, data] =
       trim_pushed_data(pull_info.recovery_info.copy_subset,
-                       pop.data_included, pop.data);
+                       push_op.data_included, push_op.data);
     bool complete = pull_info.is_complete();
-    bool clear_omap = !pop.before_progress.omap_complete;
+    bool clear_omap = !push_op.before_progress.omap_complete;
     return submit_push_data(pull_info.recovery_info,
                             first, complete, clear_omap,
                             std::move(data_zeros), std::move(usable_intervals),
-                            std::move(data), std::move(pop.omap_header),
-                            pop.attrset, std::move(pop.omap_entries), t)
+                            std::move(data), std::move(push_op.omap_header),
+                            push_op.attrset, std::move(push_op.omap_entries), t)
     .then_interruptible(
-      [this, response, &pull_info, &pop, complete,
+      [this, response, &pull_info, &push_op, complete,
         t, bytes_recovered=data.length()] {
-      pull_info.stat.num_keys_recovered += pop.omap_entries.size();
+      pull_info.stat.num_keys_recovered += push_op.omap_entries.size();
       pull_info.stat.num_bytes_recovered += bytes_recovered;
 
       if (complete) {
        pull_info.stat.num_objects_recovered++;
        pg.get_recovery_handler()->on_local_recover(
-           pop.soid, get_recovering(pop.soid).pull_info->recovery_info,
+           push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info,
            false, *t);
        return true;
       } else {
-        response->soid = pop.soid;
+        response->soid = push_op.soid;
         response->recovery_info = pull_info.recovery_info;
         response->recovery_progress = pull_info.recovery_progress;
         return false;
@@ -747,15 +752,15 @@ ReplicatedRecoveryBackend::handle_pull_response(
     logger().debug("{}: discarding {}", __func__, *m);
     return seastar::now();
   }
-  const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now.
-  if (pop.version == eversion_t()) {
+  const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
+  if (push_op.version == eversion_t()) {
     // replica doesn't have it!
-    pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid,
-       get_recovering(pop.soid).pull_info->recovery_info.version);
+    pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid,
+       get_recovering(push_op.soid).pull_info->recovery_info.version);
     return seastar::make_exception_future<>(
        std::runtime_error(fmt::format(
            "Error on pushing side {} when pulling obj {}",
-           m->from, pop.soid)));
+           m->from, push_op.soid)));
   }
 
   logger().debug("{}: {}", __func__, *m);
@@ -763,8 +768,9 @@ ReplicatedRecoveryBackend::handle_pull_response(
     return seastar::do_with(ceph::os::Transaction(), m.get(),
       [this, &response](auto& t, auto& m) {
       pg_shard_t from = m->from;
-      PushOp& pop = m->pushes[0]; // only one push per message for now
-      return _handle_pull_response(from, pop, &response, &t).then_interruptible(
+      PushOp& push_op = m->pushes[0]; // only one push per message for now
+      return _handle_pull_response(from, push_op, &response, &t
+      ).then_interruptible(
        [this, &t](bool complete) {
        epoch_t epoch_frozen = pg.get_osdmap_epoch();
        logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
@@ -777,8 +783,8 @@ ReplicatedRecoveryBackend::handle_pull_response(
       });
     }).then_interruptible([this, m, &response](bool complete) {
       if (complete) {
-       auto& pop = m->pushes[0];
-       get_recovering(pop.soid).set_pulled();
+       auto& push_op = m->pushes[0];
+       get_recovering(push_op.soid).set_pulled();
        return seastar::make_ready_future<>();
       } else {
        auto reply = crimson::make_message<MOSDPGPull>();
@@ -797,36 +803,39 @@ ReplicatedRecoveryBackend::handle_pull_response(
 RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::_handle_push(
   pg_shard_t from,
-  PushOp &pop,
+  PushOp &push_op,
   PushReplyOp *response,
   ceph::os::Transaction *t)
 {
   logger().debug("{}", __func__);
 
-  bool first = pop.before_progress.first;
+  bool first = push_op.before_progress.first;
   interval_set<uint64_t> data_zeros;
   {
-    uint64_t offset = pop.before_progress.data_recovered_to;
-    uint64_t length = (pop.after_progress.data_recovered_to -
-                       pop.before_progress.data_recovered_to);
+    uint64_t offset = push_op.before_progress.data_recovered_to;
+    uint64_t length = (push_op.after_progress.data_recovered_to -
+                       push_op.before_progress.data_recovered_to);
     if (length) {
       data_zeros.insert(offset, length);
     }
   }
-  bool complete = (pop.after_progress.data_complete &&
-                  pop.after_progress.omap_complete);
-  bool clear_omap = !pop.before_progress.omap_complete;
-  response->soid = pop.recovery_info.soid;
+  bool complete = (push_op.after_progress.data_complete &&
+                  push_op.after_progress.omap_complete);
+  bool clear_omap = !push_op.before_progress.omap_complete;
+  response->soid = push_op.recovery_info.soid;
 
-  return submit_push_data(pop.recovery_info, first, complete, clear_omap,
-                          std::move(data_zeros), std::move(pop.data_included),
-                          std::move(pop.data), std::move(pop.omap_header),
-                          pop.attrset, std::move(pop.omap_entries), t)
+  return submit_push_data(push_op.recovery_info, first, complete, clear_omap,
+                          std::move(data_zeros),
+                          std::move(push_op.data_included),
+                          std::move(push_op.data),
+                          std::move(push_op.omap_header),
+                          push_op.attrset, 
+                          std::move(push_op.omap_entries), t)
   .then_interruptible(
-    [this, complete, &pop, t] {
+    [this, complete, &push_op, t] {
     if (complete) {
       pg.get_recovery_handler()->on_local_recover(
-        pop.recovery_info.soid, pop.recovery_info,
+        push_op.recovery_info.soid, push_op.recovery_info,
         false, *t);
     }
   });
@@ -846,10 +855,10 @@ ReplicatedRecoveryBackend::handle_push(
 
   logger().debug("{}: {}", __func__, *m);
   return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
-    PushOp& pop = m->pushes[0]; // TODO: only one push per message for now
+    PushOp& push_op = m->pushes[0]; // TODO: only one push per message for now
     return seastar::do_with(ceph::os::Transaction(),
-      [this, m, &pop, &response](auto& t) {
-      return _handle_push(m->from, pop, &response, &t).then_interruptible(
+      [this, m, &push_op, &response](auto& t) {
+      return _handle_push(m->from, push_op, &response, &t).then_interruptible(
        [this, &t] {
        epoch_t epoch_frozen = pg.get_osdmap_epoch();
        logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
@@ -893,9 +902,10 @@ ReplicatedRecoveryBackend::_handle_push_reply(
     if (!push_info.recovery_progress.data_complete && !error) {
       return build_push_op(push_info.recovery_info, push_info.recovery_progress,
                           &push_info.stat
-      ).then_interruptible([&push_info] (auto pop) {
-        push_info.recovery_progress = pop.after_progress;
-       return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
+      ).then_interruptible([&push_info] (auto push_op) {
+        push_info.recovery_progress = push_op.after_progress;
+       return seastar::make_ready_future<std::optional<PushOp>>(
+          std::move(push_op));
       }).handle_exception_interruptible(
         [recovering_iter, &push_info, peer] (auto e) {
         push_info.recovery_progress.error = true;
index e58e887d48ac0f62bec7f063ff78102dc22dd9fc..1ada19b18a079a635c43ab6029f8aa3f24fa6b21 100644 (file)
@@ -66,7 +66,7 @@ protected:
   ///          recovery @c pop.soid
   interruptible_future<bool> _handle_pull_response(
     pg_shard_t from,
-    PushOp& pop,
+    PushOp& push_op,
     PullOp* response,
     ceph::os::Transaction* t);
   std::pair<interval_set<uint64_t>, ceph::bufferlist> trim_pushed_data(
@@ -90,7 +90,7 @@ protected:
     ObjectStore::Transaction *t);
   interruptible_future<> _handle_push(
     pg_shard_t from,
-    PushOp& pop,
+    PushOp& push_op,
     PushReplyOp *response,
     ceph::os::Transaction *t);
   interruptible_future<std::optional<PushOp>> _handle_push_reply(