]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg: submit_error_log send messages to osd by order
authorMatan Breizman <mbreizma@redhat.com>
Thu, 29 Jun 2023 08:26:48 +0000 (08:26 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 19 Nov 2023 09:46:00 +0000 (09:46 +0000)
Use chained futurized `send_to_osd()` instead of voided `send_cluster_message()`.

Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/osd/pg.cc

index 407d354134ce18ed038abefc53f82da3db9ffd10..b3e6c8480ec700dfbfbdd02816ba7ae0897d8e6a 100644 (file)
@@ -954,10 +954,15 @@ seastar::future<> PG::submit_error_log(
     log_entries, t, peering_state.get_pg_trim_to(),
     peering_state.get_min_last_complete_ondisk());
 
-    set<pg_shard_t> waiting_on;
-    for (auto &i : get_acting_recovery_backfill()) {
+  return seastar::do_with(log_entries, t, set<pg_shard_t>{},
+    [this, rep_tid](auto& log_entries, auto& t,auto& waiting_on) mutable {
+    return seastar::do_for_each(get_acting_recovery_backfill(),
+      [this, log_entries, t=std::move(t), waiting_on, rep_tid]
+      (auto& i) mutable {
       pg_shard_t peer(i);
-      if (peer == pg_whoami) continue;
+      if (peer == pg_whoami) {
+        return seastar::now();
+      }
       ceph_assert(peering_state.get_peer_missing().count(peer));
       ceph_assert(peering_state.has_peer_info(peer));
       auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
@@ -969,22 +974,27 @@ seastar::future<> PG::submit_error_log(
                    rep_tid,
                    peering_state.get_pg_trim_to(),
                    peering_state.get_min_last_complete_ondisk());
+      waiting_on.insert(peer);
       logger().debug("submit_error_log: sending log"
         "missing_request (rep_tid: {} entries: {})"
         " to osd {}", rep_tid, log_entries, peer.osd);
-      send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
-      waiting_on.insert(peer);
-    }
-    waiting_on.insert(pg_whoami);
-    logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
-    log_entry_update_waiting_on.insert(
-      std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
-    return shard_services.get_store().do_transaction(
-      get_collection_ref(), std::move(t))
-      .then([this] {
+      return shard_services.send_to_osd(peer.osd,
+                                        std::move(log_m),
+                                        get_osdmap_epoch());
+    }).then([this, waiting_on, t=std::move(t), rep_tid] () mutable {
+      waiting_on.insert(pg_whoami);
+      logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
+      log_entry_update_waiting_on.insert(
+        std::make_pair(rep_tid,
+                       log_update_t{std::move(waiting_on)}));
+      return shard_services.get_store().do_transaction(
+        get_collection_ref(), std::move(t)
+      ).then([this] {
         peering_state.update_trim_to();
         return seastar::now();
+      });
     });
+  });
 }
 
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>