]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg: introduce PG::log_entry_version map
authorMatan Breizman <mbreizma@redhat.com>
Thu, 2 Nov 2023 10:00:06 +0000 (10:00 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 15 Nov 2023 16:12:48 +0000 (16:12 +0000)
`submit_error_log()` was returning `version` to be used later in
`failure_func` call to `complete_write()`.

Maintain the version returned from `submit_error_log()` in a dedicated map
to avoid handling the lifetime of 'version'.

Note: This change is crucial to the following change that will
      return 'error_fut' separately.

Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 9de30555c156a0a39615fb9f18c76e04e4106763..0f6065a837758583b971224e1526062321376ee8 100644 (file)
@@ -929,8 +929,7 @@ seastar::future<> PG::submit_error_log(
   const OpInfo &op_info,
   ObjectContextRef obc,
   const std::error_code e,
-  ceph_tid_t rep_tid,
-  eversion_t &version)
+  ceph_tid_t rep_tid)
 {
   logger().debug("{}: {} rep_tid: {} error: {}",
                  __func__, *m, rep_tid, e);
@@ -948,8 +947,7 @@ seastar::future<> PG::submit_error_log(
   ceph_assert(is_primary());
   ceph_assert(!log_entries.empty());
   ceph_assert(log_entries.rbegin()->version >= projected_last_update);
-  version = projected_last_update = log_entries.rbegin()->version;
-
+  log_entry_version[rep_tid] = projected_last_update = log_entries.rbegin()->version;
   ceph::os::Transaction t;
   peering_state.merge_new_log_entries(
     log_entries, t, peering_state.get_pg_trim_to(),
@@ -1044,41 +1042,37 @@ PG::do_osd_ops(
     },
     // failure_func
     [m, &op_info, obc, this] (const std::error_code& e) {
-    return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
-      logger().error("do_osd_ops_execute::failure_func {} got error: {}", *m, e);
-      auto error_log_fut = seastar::now();
-      epoch_t epoch = get_osdmap_epoch();
-      ceph_tid_t rep_tid = shard_services.get_tid();
-      auto last_complete = peering_state.get_info().last_complete;
-      if (op_info.may_write()) {
-        // This should be executed as OrderedExclusivePhaseT so that
-        // successive ops will not reorder.
-        // TODO: https://tracker.ceph.com/issues/61651
-        error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
-      }
-      return error_log_fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] {
-        auto fut = seastar::now();
-        if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
-          ceph_assert(version != eversion_t());
-          auto it = log_entry_update_waiting_on.find(rep_tid);
-          ceph_assert(it != log_entry_update_waiting_on.end());
-          auto it2 = it->second.waiting_on.find(pg_whoami);
-          ceph_assert(it2 != it->second.waiting_on.end());
-          it->second.waiting_on.erase(it2);
-          if (it->second.waiting_on.empty()) {
-            log_entry_update_waiting_on.erase(it);
-            peering_state.complete_write(version, last_complete);
-          } else {
-            fut = it->second.all_committed.get_shared_future().then(
-              [this, &version, last_complete] {
-              peering_state.complete_write(version, last_complete);
-              return seastar::now();
-            });
-          }
+    logger().error("do_osd_ops_execute::failure_func {} got error: {}", *m, e);
+    auto error_log_fut = seastar::now();
+    epoch_t epoch = get_osdmap_epoch();
+    ceph_tid_t rep_tid = shard_services.get_tid();
+    auto last_complete = peering_state.get_info().last_complete;
+    if (op_info.may_write()) {
+      error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid);
+    }
+    return error_log_fut.then([m, e, epoch, &op_info, rep_tid, last_complete, this] {
+      auto fut = seastar::now();
+      if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+        ceph_assert(log_entry_version.contains(rep_tid));
+        auto it = log_entry_update_waiting_on.find(rep_tid);
+        ceph_assert(it != log_entry_update_waiting_on.end());
+        auto it2 = it->second.waiting_on.find(pg_whoami);
+        ceph_assert(it2 != it->second.waiting_on.end());
+        it->second.waiting_on.erase(it2);
+        if (it->second.waiting_on.empty()) {
+          log_entry_update_waiting_on.erase(it);
+          peering_state.complete_write(log_entry_version[rep_tid], last_complete);
+          log_entry_version.erase(rep_tid);
+        } else {
+          fut = it->second.all_committed.get_shared_future().then(
+            [this, last_complete, rep_tid] {
+            peering_state.complete_write(log_entry_version[rep_tid], last_complete);
+            return seastar::now();
+          });
         }
-        return fut.then([this, m, e] {
-          return log_reply(m, e);
-        });
+      }
+      return fut.then([this, m, e] {
+        return log_reply(m, e);
       });
     });
   });
@@ -1401,6 +1395,7 @@ PG::interruptible_future<> PG::do_update_log_missing_reply(
       it->second.all_committed.set_value();
       it->second.all_committed = {};
       log_entry_update_waiting_on.erase(it);
+      log_entry_version.erase(m->get_tid());
     }
   } else {
     logger().error("{} : {} got reply {} on unknown tid {}",
index e06df874b8918be6207ca9e7aa7d2cd3ef96830b..93e3ae82ec6eb91f9cd18adde804d57d60e006b4 100644 (file)
@@ -537,8 +537,7 @@ public:
     const OpInfo &op_info,
     ObjectContextRef obc,
     const std::error_code e,
-    ceph_tid_t rep_tid,
-    eversion_t &version);
+    ceph_tid_t rep_tid);
 
 private:
 
@@ -769,6 +768,7 @@ private:
   };
 
   std::map<ceph_tid_t, log_update_t> log_entry_update_waiting_on;
+  std::map<ceph_tid_t, eversion_t> log_entry_version;
   // snap trimming
   interval_set<snapid_t> snap_trimq;
 };