]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/.../replicated_recovery_backend: convert handle_pull_response to coroutine
authorSamuel Just <sjust@redhat.com>
Wed, 15 Jan 2025 22:34:09 +0000 (22:34 +0000)
committerSamuel Just <sjust@redhat.com>
Wed, 29 Jan 2025 05:00:36 +0000 (05:00 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/replicated_recovery_backend.cc

index 901fe2647d3f0ade269890e3dfefebf9fc2ca09b..01cb16bc7fa8f08b7ad50fab57905fcad12afb25 100644 (file)
@@ -936,54 +936,49 @@ ReplicatedRecoveryBackend::handle_pull_response(
   LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull_response);
   if (pg.can_discard_replica_op(*m)) {
     DEBUGDPP("discarding {}", pg, *m);
-    return seastar::now();
+    co_return;
   }
-  const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
+  PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
   if (push_op.version == eversion_t()) {
     // replica doesn't have it!
     pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid,
        get_recovering(push_op.soid).pull_info->recovery_info.version);
-    return seastar::make_exception_future<>(
-       std::runtime_error(fmt::format(
-           "Error on pushing side {} when pulling obj {}",
-           m->from, push_op.soid)));
+    throw std::runtime_error(
+      fmt::format(
+       "Error on pushing side {} when pulling obj {}",
+       m->from, push_op.soid));
   }
 
   DEBUGDPP("{}", pg, *m);
-  return seastar::do_with(PullOp(), [this, m](auto& response) {
-    return seastar::do_with(ceph::os::Transaction(), m.get(),
-     [FNAME, this, &response](auto& t, auto& m) {
-      pg_shard_t from = m->from;
-      PushOp& push_op = m->pushes[0]; // only one push per message for now
-      return _handle_pull_response(from, push_op, &response, &t
-      ).then_interruptible(
-       [FNAME, this, &t](bool complete) {
-       epoch_t epoch_frozen = pg.get_osdmap_epoch();
-       DEBUGDPP("submitting transaction", pg);
-       return shard_services.get_store().do_transaction(coll, std::move(t))
-         .then([this, epoch_frozen, complete,
-         last_complete = pg.get_info().last_complete] {
-         pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
-         return seastar::make_ready_future<bool>(complete);
-       });
-      });
-    }).then_interruptible([this, m, &response](bool complete) {
-      if (complete) {
-       auto& push_op = m->pushes[0];
-       get_recovering(push_op.soid).set_pulled();
-       return seastar::make_ready_future<>();
-      } else {
-       auto reply = crimson::make_message<MOSDPGPull>();
-       reply->from = pg.get_pg_whoami();
-       reply->set_priority(m->get_priority());
-       reply->pgid = pg.get_info().pgid;
-       reply->map_epoch = m->map_epoch;
-       reply->min_epoch = m->min_epoch;
-       reply->set_pulls({std::move(response)});
-       return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch());
-      }
-    });
-  });
+  PullOp response;
+  ceph::os::Transaction t;
+
+  pg_shard_t from = m->from;
+
+  const bool complete = co_await _handle_pull_response(
+    from, push_op, &response, &t);
+
+  epoch_t epoch_frozen = pg.get_osdmap_epoch();
+  DEBUGDPP("submitting transaction", pg);
+  co_await interruptor::make_interruptible(
+    shard_services.get_store().do_transaction(coll, std::move(t)));
+  pg.get_recovery_handler()->_committed_pushed_object(
+    epoch_frozen, pg.get_info().last_complete);
+
+  if (complete) {
+    get_recovering(push_op.soid).set_pulled();
+  } else {
+    auto reply = crimson::make_message<MOSDPGPull>();
+    reply->from = pg.get_pg_whoami();
+    reply->set_priority(m->get_priority());
+    reply->pgid = pg.get_info().pgid;
+    reply->map_epoch = m->map_epoch;
+    reply->min_epoch = m->min_epoch;
+    reply->set_pulls({std::move(response)});
+    co_await interruptor::make_interruptible(
+      shard_services.send_to_osd(
+       m->from.osd, std::move(reply), pg.get_osdmap_epoch()));
+  }
 }
 
 RecoveryBackend::interruptible_future<>