]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: wire MOSDECSubOpWriteReply up with ECBackend
authorRadosław Zarzyński <rzarzyns@redhat.com>
Thu, 28 Sep 2023 16:21:15 +0000 (18:21 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 24 Mar 2026 16:06:23 +0000 (16:06 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/crimson/osd/ec_backend.cc
src/crimson/osd/osd_operations/ecrep_request.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 77d76ae5f814a5cdd60e624582bf2ea2c465710f..9582db97208a44244520ed1c6007e24841f818b0 100644 (file)
@@ -155,8 +155,33 @@ ECBackend::handle_rep_write_op(
 }
 
 ECBackend::write_iertr::future<>
-ECBackend::handle_rep_write_reply(Ref<MOSDECSubOpWriteReply>)
+ECBackend::handle_rep_write_reply(Ref<MOSDECSubOpWriteReply> m)
 {
+  const auto& op = m->op;
+  assert(rmw_pipeline.tid_to_op_map.contains(op.tid));
+  const auto& from = op.from;
+  auto& wop = *rmw_pipeline.tid_to_op_map.at(op.tid);
+  if (op.committed) {
+    // TODO: trace.event("sub write committed");
+    ceph_assert(wop.pending_commit.count(from));
+    wop.pending_commit.erase(from);
+  }
+  if (op.applied) {
+    // TODO: trace.event("sub write applied");
+    ceph_assert(wop.pending_apply.count(from));
+    wop.pending_apply.erase(from);
+  }
+
+  if (wop.pending_commit.empty() &&
+      wop.on_all_commit &&
+      // also wait for apply, to preserve ordering with luminous peers.
+      wop.pending_apply.empty()) {
+    logger().info("{}: calling on_all_commit on {}", __func__, wop);
+    wop.on_all_commit->complete(0);
+    wop.on_all_commit = 0;
+    // TODO: wop.trace.event("ec write all committed");
+  }
+  rmw_pipeline.check_ops();
   return write_iertr::now();
 }
 
index 3b0b8fb2f6d3d775d1d84bfbbb0e51504925b282..5d28b197aea8edbaece6c3afa6a48b2abd2d95d4 100644 (file)
@@ -70,10 +70,8 @@ seastar::future<> ECRepRequest::with_pg(
       [pg] (Ref<MOSDECSubOpWrite> concrete_req) {
         return pg->handle_rep_write_op(std::move(concrete_req));
       },
-      [ec_backend] (Ref<MOSDECSubOpWriteReply> concrete_req) {
-        return ec_backend->handle_rep_write_reply(
-         std::move(concrete_req)
-       ).handle_error_interruptible(crimson::ct_error::assert_all{});
+      [pg] (Ref<MOSDECSubOpWriteReply> concrete_req) {
+        return pg->handle_rep_write_reply(std::move(concrete_req));
       },
       [pg] (Ref<MOSDECSubOpRead> concrete_req) {
         return pg->handle_rep_read_op(std::move(concrete_req));
index d99ce6cffdaa8c86f8052e90d0f871bf5154bea8..218f3ff4cc60d80860a3a4830a144a9983c524c3 100644 (file)
@@ -1482,6 +1482,21 @@ PG::interruptible_future<> PG::handle_rep_write_op(Ref<MOSDECSubOpWrite> m)
   }).handle_error_interruptible(crimson::ct_error::assert_all{});
 }
 
+PG::interruptible_future<> PG::handle_rep_write_reply(Ref<MOSDECSubOpWriteReply> m)
+{
+  if (const auto& op = m->op; op.committed) {
+    // TODO: trace.event("sub write committed");
+    if (op.from != pg_whoami) {
+      peering_state.update_peer_last_complete_ondisk(op.from, op.last_complete);
+    }
+  }
+  auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend());
+  assert(ec_backend);
+  return ec_backend->handle_rep_write_reply(
+    std::move(m)
+  ).handle_error_interruptible(crimson::ct_error::assert_all{});
+}
+
 PG::interruptible_future<> PG::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
 {
   auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend());
index 1547c38cffd5a94b9afd8a8169ec275faf9537be..c4602b2571b6275ed7f808ee702ab5a5f8b011f0 100644 (file)
@@ -782,6 +782,7 @@ public:
   seastar::future<> clear_temp_objects();
 
   interruptible_future<> handle_rep_write_op(Ref<MOSDECSubOpWrite>);
+  interruptible_future<> handle_rep_write_reply(Ref<MOSDECSubOpWriteReply>);
   interruptible_future<> handle_rep_read_op(Ref<MOSDECSubOpRead>);
 
 private: