]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: create and send error log when has error 46435/head
authorchunmei-liu <chunmei.liu@intel.com>
Wed, 25 May 2022 05:55:42 +0000 (22:55 -0700)
committerchunmei-liu <chunmei.liu@intel.com>
Wed, 22 Jun 2022 06:43:04 +0000 (23:43 -0700)
Signed-off-by: chunmei-liu <chunmei.liu@intel.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/shard_services.h

index 66680e2824879b1b4cd27e1401a92617d8a3bc69..3930575f2d6c68f932a0b275be3315ceac837787 100644 (file)
@@ -711,6 +711,63 @@ PG::do_osd_ops_execute(
         }));
   }));
 }
+seastar::future<> PG::submit_error_log(
+  Ref<MOSDOp> m,
+  const OpInfo &op_info,
+  ObjectContextRef obc,
+  const std::error_code e,
+  ceph_tid_t rep_tid,
+  eversion_t &version)
+{
+  const osd_reqid_t &reqid = m->get_reqid();
+  mempool::osd_pglog::list<pg_log_entry_t> log_entries;
+  log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
+                                       obc->obs.oi.soid,
+                                       next_version(),
+                                       eversion_t(), 0,
+                                       reqid, utime_t(),
+                                       -e.value()));
+  if (op_info.allows_returnvec()) {
+    log_entries.back().set_op_returns(m->ops);
+  }
+  ceph_assert(is_primary());
+  if (!log_entries.empty()) {
+    ceph_assert(log_entries.rbegin()->version >= projected_last_update);
+    version = projected_last_update = log_entries.rbegin()->version;
+  }
+  ceph::os::Transaction t;
+  peering_state.merge_new_log_entries(
+    log_entries, t, peering_state.get_pg_trim_to(),
+    peering_state.get_min_last_complete_ondisk());
+
+    set<pg_shard_t> waiting_on;
+    for (auto &i : get_acting_recovery_backfill()) {
+      pg_shard_t peer(i);
+      if (peer == pg_whoami) continue;
+      ceph_assert(peering_state.get_peer_missing().count(peer));
+      ceph_assert(peering_state.has_peer_info(peer));
+      auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+                   log_entries,
+                   spg_t(peering_state.get_info().pgid.pgid, i.shard),
+                   pg_whoami.shard,
+                   get_osdmap_epoch(),
+                   get_last_peering_reset(),
+                   rep_tid,
+                   peering_state.get_pg_trim_to(),
+                   peering_state.get_min_last_complete_ondisk());
+      send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
+      waiting_on.insert(peer);
+    }
+    waiting_on.insert(pg_whoami);
+    log_entry_update_waiting_on.insert(
+      std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
+    return shard_services.get_store().do_transaction(
+      get_collection_ref(), std::move(t))
+      .then([this] {
+        peering_state.update_trim_to();
+        return seastar::now();
+    });
+}
 
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
 PG::do_osd_ops(
@@ -763,18 +820,59 @@ PG::do_osd_ops(
       return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
         std::move(reply));
     },
-    [m, this] (const std::error_code& e) {
-      auto reply = crimson::make_message<MOSDOpReply>(
-        m.get(), -e.value(), get_osdmap_epoch(), 0, false);
-      if (m->ops.empty() ? 0 :
-        m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
-        reply->set_result(0);
+    [m, &op_info, obc, this] (const std::error_code& e) {
+    return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
+      auto fut = seastar::now();
+      epoch_t epoch = get_osdmap_epoch();
+      ceph_tid_t rep_tid = shard_services.get_tid();
+      auto last_complete = peering_state.get_info().last_complete;
+      if (op_info.may_write()) {
+        fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
       }
-      reply->set_enoent_reply_versions(
-        peering_state.get_info().last_update,
-        peering_state.get_info().last_user_version);
-      return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+      return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete,  this] {
+        auto log_reply = [m, e, this] {
+          auto reply = crimson::make_message<MOSDOpReply>(
+            m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+          if (m->ops.empty() ? 0 :
+            m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+            reply->set_result(0);
+          }
+          reply->set_enoent_reply_versions(
+          peering_state.get_info().last_update,
+          peering_state.get_info().last_user_version);
+          reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+          return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+            std::move(reply));
+       };
+
+        if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+          auto it = log_entry_update_waiting_on.find(rep_tid);
+          ceph_assert(it != log_entry_update_waiting_on.end());
+          auto it2 = it->second.waiting_on.find(pg_whoami);
+          ceph_assert(it2 != it->second.waiting_on.end());
+          it->second.waiting_on.erase(it2);
+
+          if (it->second.waiting_on.empty()) {
+            log_entry_update_waiting_on.erase(it);
+            if (version != eversion_t()) {
+              peering_state.complete_write(version, last_complete);
+            }
+            return log_reply();
+          } else {
+            return it->second.all_committed.get_shared_future()
+              .then([this, &version, last_complete, log_reply = std::move(log_reply)] {
+              if (version != eversion_t()) {
+                peering_state.complete_write(version, last_complete);
+              }
+              return log_reply();
+            });
+          }
+        } else {
+          return log_reply();
+        }
+      });
     });
+  });
 }
 
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
index 0a23e55d1311adf152c89d8a4fc067c823b498b1..5e764e738fae11a7b7487390375393fd0f835ec0 100644 (file)
@@ -551,6 +551,13 @@ public:
 
   void print(std::ostream& os) const;
   void dump_primary(Formatter*);
+  seastar::future<> submit_error_log(
+    Ref<MOSDOp> m,
+    const OpInfo &op_info,
+    ObjectContextRef obc,
+    const std::error_code e,
+    ceph_tid_t rep_tid,
+    eversion_t &version);
 
 private:
   template<RWState::State State>
index 6913a52cbb576f23a930ab57b7dec9c785e0196f..39a635a070b52ed2cf1d5b14808d9a3471f30f3d 100644 (file)
@@ -141,9 +141,9 @@ public:
 
   // -- tids --
   // for ops i issue
-  unsigned int last_tid{0};
+  unsigned int next_tid{0};
   ceph_tid_t get_tid() {
-    return (ceph_tid_t)last_tid++;
+    return (ceph_tid_t)next_tid++;
   }
 
   // PG Temp State