]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/pg: submit_error_log cleanup
authorMatan Breizman <mbreizma@redhat.com>
Thu, 7 Dec 2023 13:48:02 +0000 (13:48 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Tue, 16 Jan 2024 12:42:58 +0000 (12:42 +0000)
* error log completion logic is moved into maybe_submit_error_log
* renamed it and it2
* maybe_submit_error_log is moved outside of failure_func
* failure_func no longer gets rep_tid and record_error params
* log_entry_version is removed, submit_error_log returns the version instead

Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index a605745a3befb134618cea69c68e62eff28c4e02..cd2478d58eacafc9bc8c9fd310702a06d6ebfda6 100644 (file)
@@ -323,7 +323,6 @@ void PG::on_removal(ceph::os::Transaction &t) {
 void PG::clear_log_entry_maps()
 {
   log_entry_update_waiting_on.clear();
-  log_entry_version.clear();
 }
 
 void PG::on_activate(interval_set<snapid_t> snaps)
@@ -830,7 +829,7 @@ PG::do_osd_ops_execute(
         return submit_error_log(m, op_info, obc, e, rep_tid);
       }
     }
-    return seastar::now();
+    return seastar::make_ready_future<std::optional<eversion_t>>(std::nullopt);
   };
   auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
   return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
@@ -910,16 +909,16 @@ PG::do_osd_ops_execute(
         });
       });
     }), OpsExecuter::osd_op_errorator::all_same_way(
-        [this, rollbacker, failure_func_ptr]
+        [rollbacker, failure_func_ptr]
         (const std::error_code& e) mutable {
           // handle non-fatal errors only
           ceph_assert(e.value() == EDQUOT ||
                       e.value() == ENOSPC ||
                       e.value() == EAGAIN);
           return rollbacker.rollback_obc_if_modified(e).then_interruptible(
-          [this, e, failure_func_ptr] {
+          [e, failure_func_ptr] {
             // no need to record error log
-            return (*failure_func_ptr)(e , shard_services.get_tid(), false);
+            return (*failure_func_ptr)(e);
           });
     }));
 
@@ -935,20 +934,59 @@ PG::do_osd_ops_execute(
     ceph_tid_t rep_tid = shard_services.get_tid();
     return rollbacker.rollback_obc_if_modified(e).then_interruptible(
     [maybe_submit_error_log=std::move(maybe_submit_error_log),
-     e, rep_tid, failure_func_ptr] {
+     this, e, rep_tid, failure_func_ptr] {
       // record error log
       return maybe_submit_error_log(e, rep_tid).then(
-      [failure_func_ptr, e, rep_tid] {
+      [this, failure_func_ptr, e, rep_tid] (auto version) {
+        auto all_completed =
+        [this, failure_func_ptr, e, rep_tid,  version] {
+          if (version.has_value()) {
+            return complete_error_log(rep_tid, version.value()).then(
+            [failure_func_ptr, e] {
+              return (*failure_func_ptr)(e);
+            });
+          } else {
+            return (*failure_func_ptr)(e);
+          }
+        };
         return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
           std::move(seastar::now()),
-          std::move((*failure_func_ptr)(e, rep_tid, true))
+          std::move(all_completed())
         );
       });
     });
   }));
 }
 
-seastar::future<> PG::submit_error_log(
+seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
+                                         const eversion_t& version)
+{
+  auto result = seastar::now();
+  auto last_complete = peering_state.get_info().last_complete;
+  ceph_assert(log_entry_update_waiting_on.contains(rep_tid));
+  auto& log_update = log_entry_update_waiting_on[rep_tid];
+  ceph_assert(log_update.waiting_on.contains(pg_whoami));
+  log_update.waiting_on.erase(pg_whoami);
+  if (log_update.waiting_on.empty()) {
+    log_entry_update_waiting_on.erase(rep_tid);
+    peering_state.complete_write(version, last_complete);
+    logger().debug("complete_error_log: write complete,"
+                   " erasing rep_tid {}", rep_tid);
+  } else {
+    logger().debug("complete_error_log: rep_tid {} awaiting update from {}",
+                   rep_tid, log_update.waiting_on);
+    result = log_update.all_committed.get_shared_future().then(
+    [this, last_complete, rep_tid, version] {
+      logger().debug("complete_error_log: rep_tid {} awaited ", rep_tid);
+      peering_state.complete_write(version, last_complete);
+      ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
+      return seastar::now();
+    });
+  }
+  return result;
+}
+
+seastar::future<std::optional<eversion_t>> PG::submit_error_log(
   Ref<MOSDOp> m,
   const OpInfo &op_info,
   ObjectContextRef obc,
@@ -971,7 +1009,7 @@ seastar::future<> PG::submit_error_log(
   ceph_assert(is_primary());
   ceph_assert(!log_entries.empty());
   ceph_assert(log_entries.rbegin()->version >= projected_last_update);
-  log_entry_version[rep_tid] = projected_last_update = log_entries.rbegin()->version;
+  projected_last_update = log_entries.rbegin()->version;
   ceph::os::Transaction t;
   peering_state.merge_new_log_entries(
     log_entries, t, peering_state.get_pg_trim_to(),
@@ -1014,7 +1052,7 @@ seastar::future<> PG::submit_error_log(
         get_collection_ref(), std::move(t)
       ).then([this] {
         peering_state.update_trim_to();
-        return seastar::now();
+        return seastar::make_ready_future<std::optional<eversion_t>>(projected_last_update);
       });
     });
   });
@@ -1078,42 +1116,11 @@ PG::do_osd_ops(
         std::move(reply));
     },
     // failure_func
-    [m, &op_info, obc, this]
-    (const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) {
-    logger().error("do_osd_ops_execute::failure_func {} got error: {} record_error: {}",
-                    *m, e, record_error);
-    epoch_t epoch = get_osdmap_epoch();
-    auto last_complete = peering_state.get_info().last_complete;
-    auto fut = seastar::now();
-    if (record_error && !peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
-      logger().debug("do_osd_ops_execute::failure_func finding rep_tid {}",
-                      rep_tid);
-      ceph_assert(log_entry_version.contains(rep_tid));
-      auto it = log_entry_update_waiting_on.find(rep_tid);
-      ceph_assert(it != log_entry_update_waiting_on.end());
-      auto it2 = it->second.waiting_on.find(pg_whoami);
-      ceph_assert(it2 != it->second.waiting_on.end());
-      it->second.waiting_on.erase(it2);
-      if (it->second.waiting_on.empty()) {
-        log_entry_update_waiting_on.erase(it);
-        peering_state.complete_write(log_entry_version[rep_tid], last_complete);
-        log_entry_version.erase(rep_tid);
-        logger().debug("do_osd_ops_execute::failure_func write complete,"
-                        " erasing rep_tid {}", rep_tid);
-
-      } else {
-        fut = it->second.all_committed.get_shared_future().then(
-          [this, last_complete, rep_tid] {
-          logger().debug("do_osd_ops_execute::failure_func awaited {}", rep_tid);
-          peering_state.complete_write(log_entry_version[rep_tid], last_complete);
-          ceph_assert(!log_entry_update_waiting_on.contains(rep_tid));
-          return seastar::now();
-        });
-      }
-    }
-    return fut.then([this, m, e] {
-      return log_reply(m, e);
-    });
+    [m, this]
+    (const std::error_code& e) {
+    logger().error("do_osd_ops_execute::failure_func {} got error: {}",
+                    *m, e);
+    return log_reply(m, e);
   });
 }
 
@@ -1177,7 +1184,7 @@ PG::do_osd_ops(
         return do_osd_ops_iertr::now();
       },
       // failure_func
-      [] (const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) {
+      [] (const std::error_code& e) {
         return do_osd_ops_iertr::now();
       });
   });
@@ -1442,7 +1449,6 @@ PG::interruptible_future<> PG::do_update_log_missing_reply(
       logger().debug("{}: erasing rep_tid {}",
                      __func__, m->get_tid());
       log_entry_update_waiting_on.erase(it);
-      log_entry_version.erase(m->get_tid());
     }
   } else {
     logger().error("{} : {} got reply {} on unknown tid {}",
index 80a181f24a23aba92b439160fb14fa581d2b721b..b829ea177dbbfafd19c2798f135a63671f02da5a 100644 (file)
@@ -533,7 +533,9 @@ public:
 
   void print(std::ostream& os) const;
   void dump_primary(Formatter*);
-  seastar::future<> submit_error_log(
+  seastar::future<> complete_error_log(const ceph_tid_t& rep_tid,
+                                       const eversion_t& version);
+  seastar::future<std::optional<eversion_t>> submit_error_log(
     Ref<MOSDOp> m,
     const OpInfo &op_info,
     ObjectContextRef obc,
@@ -791,7 +793,6 @@ private:
   };
 
   std::map<ceph_tid_t, log_update_t> log_entry_update_waiting_on;
-  std::map<ceph_tid_t, eversion_t> log_entry_version;
   // snap trimming
   interval_set<snapid_t> snap_trimq;
 };