]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: add support for ECWrites on replica
authorRadosław Zarzyński <rzarzyns@redhat.com>
Mon, 11 Sep 2023 14:26:58 +0000 (16:26 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 27 Jan 2026 14:37:36 +0000 (14:37 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/replicated_recovery_backend.cc

index 217351deb35fcd6752ce32be024750b47d92df50..754a54a736f8979e27e6fbc881ea02a52013d7db 100644 (file)
@@ -1,6 +1,7 @@
 #include <boost/iterator/counting_iterator.hpp>
 
 #include "crimson/common/log.h"
+#include "crimson/osd/pg.h"
 #include "crimson/osd/shard_services.h"
 #include "ec_backend.h"
 
@@ -67,9 +68,76 @@ ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
 }
 
 ECBackend::write_iertr::future<>
-ECBackend::handle_rep_write_op(Ref<MOSDECSubOpWrite>)
+ECBackend::handle_sub_write(
+  pg_shard_t from,
+  ECSubWrite &&op,
+  crimson::osd::PG& pg)
 {
-  return write_iertr::now();
+  LOG_PREFIX(ECBackend::handle_sub_write);
+  logger().info("{} from {}", __func__, from);
+  if (!op.temp_added.empty()) {
+    add_temp_obj(std::begin(op.temp_added), std::end(op.temp_added));
+  }
+  ceph::os::Transaction txn;
+  if (op.backfill_or_async_recovery) {
+    for (const auto& obj : op.temp_removed) {
+      logger().info("{}: removing object {} since we won't get the transaction",
+                   __func__, obj);
+      txn.remove(coll->get_cid(),
+                ghobject_t{obj, ghobject_t::NO_GEN, get_shard()});
+    }
+  }
+  clear_temp_objs(op.temp_removed);
+  logger().debug("{}: missing before {}", __func__, "");
+
+  // flag set to true during async recovery
+  bool async = false;
+  if (pg.is_missing_object(op.soid)) {
+    async = true;
+    logger().debug("{}: {} is missing", __func__, op.soid);
+    for (const auto& e: op.log_entries) {
+      logger().debug("{}: add_next_event entry {}, is_delete {}",
+                     __func__, e, e.is_delete());
+      pg.add_local_next_event(e);
+    }
+  }
+  pg.log_operation(
+    std::move(op.log_entries),
+    op.updated_hit_set_history,
+    op.trim_to,
+    op.pg_committed_to,
+    op.pg_committed_to,
+    !op.backfill_or_async_recovery,
+    txn,
+    async);
+  txn.append(op.t); // hack warn
+  logger().debug("{}:{}", __func__, __LINE__);
+  if (op.at_version != eversion_t()) {
+    // dummy rollforward transaction doesn't get at_version
+    // (and doesn't advance it)
+    pg.op_applied(op.at_version);
+  }
+  logger().debug("{}:{}", __func__, __LINE__);
+  return store->do_transaction(coll, std::move(txn)).then([FNAME] {
+    DEBUG("transaction commited!");
+    return write_iertr::now();
+  });
+}
+
+ECBackend::write_iertr::future<>
+ECBackend::handle_rep_write_op(
+  Ref<MOSDECSubOpWrite> m,
+  crimson::osd::PG& pg)
+{
+  LOG_PREFIX(ECBackend::handle_rep_write_op);
+  const auto tid = m->op.tid;
+  DEBUG("tid {} from {}", tid, m->op.from);
+  return handle_sub_write(
+    m->op.from, std::move(m->op), pg
+  ).si_then([&pg] {
+    assert(!pg.pgb_is_primary());
+    return write_iertr::now();
+  }, crimson::ct_error::assert_all{});
 }
 
 ECBackend::write_iertr::future<>
index 3b637a4d3464399daad021bf2d375615fcced9f0..d685cdcf1296ab83f5f81d300f49dc621e0217bc 100644 (file)
@@ -18,6 +18,8 @@
 
 namespace crimson::osd {
 
+class PG;
+
 class ECBackend : public PGBackend
 {
   static ceph::ErasureCodeInterfaceRef create_ec_impl(
@@ -37,7 +39,9 @@ public:
   }
   void on_actingset_changed(bool same_primary) final {}
 
-  write_iertr::future<> handle_rep_write_op(Ref<MOSDECSubOpWrite>);
+  write_iertr::future<> handle_rep_write_op(
+    Ref<MOSDECSubOpWrite>,
+    crimson::osd::PG& pg);
   write_iertr::future<> handle_rep_write_reply(Ref<MOSDECSubOpWriteReply>);
   ll_read_ierrorator::future<> handle_rep_read_op(Ref<MOSDECSubOpRead>);
   ll_read_ierrorator::future<> handle_rep_read_reply(Ref<MOSDECSubOpReadReply>);
@@ -52,12 +56,16 @@ private:
                     osd_op_params_t&& req,
                     epoch_t min_epoch, epoch_t max_epoch,
                     std::vector<pg_log_entry_t>&& log_entries) final;
-  CollectionRef coll;
   seastar::future<> request_committed(const osd_reqid_t& reqid,
                                       const eversion_t& version) final {
     return seastar::now();
   }
 
+  write_iertr::future<> handle_sub_write(
+    pg_shard_t from,
+    ECSubWrite&& op,
+    crimson::osd::PG& pg);
+
   bool is_single_chunk(const hobject_t& obj, const ECSubRead& op);
 
   ll_read_errorator::future<ceph::bufferlist> maybe_chunked_read(
index bda0b314b834dd157908617de42fc525c0d637b3..7bd5f89f3842d0a17e66123859eba29218ea0fe1 100644 (file)
@@ -1433,11 +1433,34 @@ void PG::handle_rep_op_reply(const MOSDRepOpReply& m)
 
 PG::interruptible_future<> PG::handle_rep_write_op(Ref<MOSDECSubOpWrite> m)
 {
+  logger().debug("{}", __func__);
+  if (!is_primary()) {
+    peering_state.update_stats([&new_stats=m->op.stats](auto&, auto &stats) {
+      stats = new_stats;
+      return false;
+    });
+  }
   auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend());
   assert(ec_backend);
+  const auto tid = m->op.tid;
   return ec_backend->handle_rep_write_op(
-    std::move(m)
-  ).handle_error_interruptible(crimson::ct_error::assert_all{});
+    std::move(m),
+    *this
+  ).si_then([this, then_lcod=peering_state.get_info().last_complete, tid] {
+    logger().debug("{} sending response", "handle_rep_write_op");
+    peering_state.update_last_complete_ondisk(then_lcod);
+    auto r = crimson::make_message<MOSDECSubOpWriteReply>();
+    r->pgid = spg_t(peering_state.get_info().pgid.pgid, get_primary().shard);
+    r->map_epoch = get_osdmap_epoch();
+    r->min_epoch = peering_state.get_info().history.same_interval_since;
+    r->op.tid = tid;
+    r->op.last_complete = then_lcod;
+    r->op.committed = true;
+    r->op.applied = true;
+    r->op.from = pg_whoami;
+    r->set_priority(CEPH_MSG_PRIO_HIGH);
+    return shard_services.send_to_osd(get_primary().osd, std::move(r), get_osdmap_epoch());
+  }).handle_error_interruptible(crimson::ct_error::assert_all{});
 }
 
 PG::interruptible_future<> PG::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
@@ -1723,6 +1746,22 @@ bool PG::should_send_op(
   //       by crimson yet
 }
 
+void PG::op_applied(const eversion_t &applied_version)
+{
+  logger().info("{}: op_applied version {}", __func__, applied_version);
+  assert(applied_version != eversion_t());
+  assert(applied_version <= peering_state.get_info().last_update);
+  peering_state.local_write_applied(applied_version);
+
+#if 0
+  if (is_primary() && m_scrubber) {
+    // if there's a scrub operation waiting for the selected chunk to be fully updated -
+    // allow it to continue
+    m_scrubber->on_applied_when_primary(recovery_state.get_last_update_applied());
+  }
+#endif
+}
+
 PG::interruptible_future<std::optional<PG::complete_op_t>>
 PG::already_complete(const osd_reqid_t& reqid)
 {
index 9b35246635a5947598fa219c51b1123989768f06..8029e6982cce07486377d78111e48b876c2b62ca 100644 (file)
@@ -47,6 +47,7 @@
 
 class MQuery;
 class OSDMap;
+class ECBackend;
 class PGPeeringEvent;
 class osd_op_params_t;
 
@@ -997,6 +998,7 @@ private:
   friend class WatchTimeoutRequest;
   friend class SnapTrimEvent;
   friend class SnapTrimObjSubEvent;
+  friend ECBackend;
 private:
 
   void enqueue_push_for_backfill(
@@ -1033,6 +1035,10 @@ private:
   const std::set<pg_shard_t> &get_actingset() const {
     return peering_state.get_actingset();
   }
+  void add_local_next_event(const pg_log_entry_t& e) {
+    peering_state.add_local_next_event(e);
+  }
+  void op_applied(const eversion_t &applied_version);
 
 private:
   friend class IOInterruptCondition;
index b4f65a3e84e6019c29143a31f63e5591fc6f7b0e..d19656935cb7271971684fbbdebb9bb51a34c2b0 100644 (file)
@@ -17,6 +17,8 @@
 #include "os/Transaction.h"
 #include "common/Checksummer.h"
 #include "common/Clock.h"
+#include "erasure-code/ErasureCodeInterface.h"
+#include "erasure-code/ErasureCodePlugin.h"
 
 #include "crimson/common/coroutine.h"
 #include "crimson/common/exception.h"
@@ -82,7 +84,9 @@ PGBackend::PGBackend(pg_shard_t whoami,
     shard_services{shard_services},
     dpp{dpp},
     store{&shard_services.get_store()}
-{}
+{
+  logger().info("initialized PGBackend::store with {}", (void*)this->store);
+}
 
 PGBackend::load_metadata_iertr::future
   <PGBackend::loaded_object_md_t::ref>
index c9427ef96456c36a8f5e05fab44cbc4e8f9ee8a9..49fa62977c1623c6cc0861462294bda19f351b69 100644 (file)
@@ -601,7 +601,7 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op(
     PushOp* push_op)
 {
   LOG_PREFIX(ReplicatedRecoveryBackend::read_metadata_for_push_op);
-  DEBUGDPP("{}", pg, oid);
+  DEBUGDPP("{} progress.first {}", pg, oid, progress.first);
   if (!progress.first) {
     return seastar::make_ready_future<eversion_t>(ver);
   }