]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: add support for ECWrites on replica
authorRadosław Zarzyński <rzarzyns@redhat.com>
Mon, 11 Sep 2023 14:26:58 +0000 (16:26 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 24 Mar 2026 16:06:23 +0000 (16:06 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/replicated_recovery_backend.cc

index bd14b891b6de3d0d75fb3c3fcc43697ed004912d..01567cb7c9a0ce25f67a789c8704b062a09c88d5 100644 (file)
@@ -1,6 +1,7 @@
 #include <boost/iterator/counting_iterator.hpp>
 
 #include "crimson/common/log.h"
+#include "crimson/osd/pg.h"
 #include "crimson/osd/shard_services.h"
 #include "ec_backend.h"
 
@@ -68,9 +69,76 @@ ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
 }
 
 ECBackend::write_iertr::future<>
-ECBackend::handle_rep_write_op(Ref<MOSDECSubOpWrite>)
+ECBackend::handle_sub_write(
+  pg_shard_t from,
+  ECSubWrite &&op,
+  crimson::osd::PG& pg)
 {
-  return write_iertr::now();
+  LOG_PREFIX(ECBackend::handle_sub_write);
+  logger().info("{} from {}", __func__, from);
+  if (!op.temp_added.empty()) {
+    add_temp_obj(std::begin(op.temp_added), std::end(op.temp_added));
+  }
+  ceph::os::Transaction txn;
+  if (op.backfill_or_async_recovery) {
+    for (const auto& obj : op.temp_removed) {
+      logger().info("{}: removing object {} since we won't get the transaction",
+                   __func__, obj);
+      txn.remove(coll->get_cid(),
+                ghobject_t{obj, ghobject_t::NO_GEN, get_shard()});
+    }
+  }
+  clear_temp_objs(op.temp_removed);
+  logger().debug("{}: missing before {}", __func__, "");
+
+  // flag set to true during async recovery
+  bool async = false;
+  if (pg.is_missing_object(op.soid)) {
+    async = true;
+    logger().debug("{}: {} is missing", __func__, op.soid);
+    for (const auto& e: op.log_entries) {
+      logger().debug("{}: add_next_event entry {}, is_delete {}",
+                     __func__, e, e.is_delete());
+      pg.add_local_next_event(e);
+    }
+  }
+  pg.log_operation(
+    std::move(op.log_entries),
+    op.updated_hit_set_history,
+    op.trim_to,
+    op.pg_committed_to,
+    op.pg_committed_to,
+    !op.backfill_or_async_recovery,
+    txn,
+    async);
+  txn.append(op.t); // hack warn
+  logger().debug("{}:{}", __func__, __LINE__);
+  if (op.at_version != eversion_t()) {
+    // dummy rollforward transaction doesn't get at_version
+    // (and doesn't advance it)
+    pg.op_applied(op.at_version);
+  }
+  logger().debug("{}:{}", __func__, __LINE__);
+  return store->do_transaction(coll, std::move(txn)).then([FNAME] {
+    DEBUG("transaction commited!");
+    return write_iertr::now();
+  });
+}
+
+ECBackend::write_iertr::future<>
+ECBackend::handle_rep_write_op(
+  Ref<MOSDECSubOpWrite> m,
+  crimson::osd::PG& pg)
+{
+  LOG_PREFIX(ECBackend::handle_rep_write_op);
+  const auto tid = m->op.tid;
+  DEBUG("tid {} from {}", tid, m->op.from);
+  return handle_sub_write(
+    m->op.from, std::move(m->op), pg
+  ).si_then([&pg] {
+    assert(!pg.pgb_is_primary());
+    return write_iertr::now();
+  }, crimson::ct_error::assert_all{});
 }
 
 ECBackend::write_iertr::future<>
index 72a572345809443906877269a615bbaf9187f7e7..ef21096b4d46cc2fbd7fa68e950767d3ebef4075 100644 (file)
@@ -18,6 +18,8 @@
 
 namespace crimson::osd {
 
+class PG;
+
 class ECBackend : public PGBackend
 {
   static ceph::ErasureCodeInterfaceRef create_ec_impl(
@@ -38,7 +40,9 @@ public:
   }
   void on_actingset_changed(bool same_primary) final {}
 
-  write_iertr::future<> handle_rep_write_op(Ref<MOSDECSubOpWrite>);
+  write_iertr::future<> handle_rep_write_op(
+    Ref<MOSDECSubOpWrite>,
+    crimson::osd::PG& pg);
   write_iertr::future<> handle_rep_write_reply(Ref<MOSDECSubOpWriteReply>);
   ll_read_ierrorator::future<> handle_rep_read_op(Ref<MOSDECSubOpRead>);
   ll_read_ierrorator::future<> handle_rep_read_reply(Ref<MOSDECSubOpReadReply>);
@@ -53,12 +57,16 @@ private:
                     osd_op_params_t&& req,
                     epoch_t min_epoch, epoch_t max_epoch,
                     std::vector<pg_log_entry_t>&& log_entries) final;
-  CollectionRef coll;
   seastar::future<> request_committed(const osd_reqid_t& reqid,
                                       const eversion_t& version) final {
     return seastar::now();
   }
 
+  write_iertr::future<> handle_sub_write(
+    pg_shard_t from,
+    ECSubWrite&& op,
+    crimson::osd::PG& pg);
+
   bool is_single_chunk(const hobject_t& obj, const ECSubRead& op);
 
   ll_read_errorator::future<ceph::bufferlist> maybe_chunked_read(
index 06366b57d0767117bd7aab27709917bd65ff1558..9b3a58715a17e93954ac6c64ca23507bdcf63b4a 100644 (file)
@@ -1451,11 +1451,34 @@ void PG::handle_rep_op_reply(const MOSDRepOpReply& m)
 
 PG::interruptible_future<> PG::handle_rep_write_op(Ref<MOSDECSubOpWrite> m)
 {
+  logger().debug("{}", __func__);
+  if (!is_primary()) {
+    peering_state.update_stats([&new_stats=m->op.stats](auto&, auto &stats) {
+      stats = new_stats;
+      return false;
+    });
+  }
   auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend());
   assert(ec_backend);
+  const auto tid = m->op.tid;
   return ec_backend->handle_rep_write_op(
-    std::move(m)
-  ).handle_error_interruptible(crimson::ct_error::assert_all{});
+    std::move(m),
+    *this
+  ).si_then([this, then_lcod=peering_state.get_info().last_complete, tid] {
+    logger().debug("{} sending response", "handle_rep_write_op");
+    peering_state.update_last_complete_ondisk(then_lcod);
+    auto r = crimson::make_message<MOSDECSubOpWriteReply>();
+    r->pgid = spg_t(peering_state.get_info().pgid.pgid, get_primary().shard);
+    r->map_epoch = get_osdmap_epoch();
+    r->min_epoch = peering_state.get_info().history.same_interval_since;
+    r->op.tid = tid;
+    r->op.last_complete = then_lcod;
+    r->op.committed = true;
+    r->op.applied = true;
+    r->op.from = pg_whoami;
+    r->set_priority(CEPH_MSG_PRIO_HIGH);
+    return shard_services.send_to_osd(get_primary().osd, std::move(r), get_osdmap_epoch());
+  }).handle_error_interruptible(crimson::ct_error::assert_all{});
 }
 
 PG::interruptible_future<> PG::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
@@ -1743,6 +1766,22 @@ bool PG::should_send_op(
   //       by crimson yet
 }
 
+void PG::op_applied(const eversion_t &applied_version)
+{
+  logger().info("{}: op_applied version {}", __func__, applied_version);
+  assert(applied_version != eversion_t());
+  assert(applied_version <= peering_state.get_info().last_update);
+  peering_state.local_write_applied(applied_version);
+
+#if 0
+  if (is_primary() && m_scrubber) {
+    // if there's a scrub operation waiting for the selected chunk to be fully updated -
+    // allow it to continue
+    m_scrubber->on_applied_when_primary(recovery_state.get_last_update_applied());
+  }
+#endif
+}
+
 PG::interruptible_future<std::optional<PG::complete_op_t>>
 PG::already_complete(const osd_reqid_t& reqid)
 {
index 7415adcc06dee6372916365b1167a5460c0fec7f..8ba83d012e98ea29ded98213f2f2df4d64bd1453 100644 (file)
@@ -47,6 +47,7 @@
 
 class MQuery;
 class OSDMap;
+class ECBackend;
 class PGPeeringEvent;
 class osd_op_params_t;
 
@@ -1005,6 +1006,7 @@ private:
   friend class WatchTimeoutRequest;
   friend class SnapTrimEvent;
   friend class SnapTrimObjSubEvent;
+  friend ECBackend;
 private:
 
   void enqueue_push_for_backfill(
@@ -1041,6 +1043,10 @@ private:
   const std::set<pg_shard_t> &get_actingset() const {
     return peering_state.get_actingset();
   }
+  void add_local_next_event(const pg_log_entry_t& e) {
+    peering_state.add_local_next_event(e);
+  }
+  void op_applied(const eversion_t &applied_version);
 
 private:
   friend class IOInterruptCondition;
index 0864c06b2e248a291462634eff0179b01bc12fff..3823f0b49a4e0139ff3481c73c0d15fc9a7cd1c9 100644 (file)
@@ -17,6 +17,8 @@
 #include "os/Transaction.h"
 #include "common/Checksummer.h"
 #include "common/Clock.h"
+#include "erasure-code/ErasureCodeInterface.h"
+#include "erasure-code/ErasureCodePlugin.h"
 
 #include "crimson/common/coroutine.h"
 #include "crimson/common/exception.h"
@@ -82,8 +84,10 @@ PGBackend::PGBackend(pg_shard_t whoami,
     coll{coll},
     shard_services{shard_services},
     dpp{dpp},
-    store{shard_services.get_store(store_index)}
-{}
+    store{&shard_services.get_store(store_index)}
+{
+  logger().info("initialized PGBackend::store with {}", (void*)this->store);
+}
 
 PGBackend::load_metadata_iertr::future
   <PGBackend::loaded_object_md_t::ref>
index 2f277b1e65414b4a3fb0a0c5cec871a92f6ffc38..7fcccdeb8fab04e5ab5fd17686874066babf82b8 100644 (file)
@@ -605,7 +605,7 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op(
     PushOp* push_op)
 {
   LOG_PREFIX(ReplicatedRecoveryBackend::read_metadata_for_push_op);
-  DEBUGDPP("{}", pg, oid);
+  DEBUGDPP("{} progress.first {}", pg, oid, progress.first);
   if (!progress.first) {
     return seastar::make_ready_future<eversion_t>(ver);
   }