From: Radosław Zarzyński Date: Thu, 28 Sep 2023 16:21:15 +0000 (+0200) Subject: crimson/osd: wire MOSDECSubOpWriteReply up with ECBackend X-Git-Tag: v21.0.0~3^2~81 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7ce087a02f1938c1534ff03f15f444e92174e2ce;p=ceph.git crimson/osd: wire MOSDECSubOpWriteReply up with ECBackend Signed-off-by: Radosław Zarzyński --- diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 77d76ae5f814..9582db97208a 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -155,8 +155,33 @@ ECBackend::handle_rep_write_op( } ECBackend::write_iertr::future<> -ECBackend::handle_rep_write_reply(Ref) +ECBackend::handle_rep_write_reply(Ref m) { + const auto& op = m->op; + assert(rmw_pipeline.tid_to_op_map.contains(op.tid)); + const auto& from = op.from; + auto& wop = *rmw_pipeline.tid_to_op_map.at(op.tid); + if (op.committed) { + // TODO: trace.event("sub write committed"); + ceph_assert(wop.pending_commit.count(from)); + wop.pending_commit.erase(from); + } + if (op.applied) { + // TODO: trace.event("sub write applied"); + ceph_assert(wop.pending_apply.count(from)); + wop.pending_apply.erase(from); + } + + if (wop.pending_commit.empty() && + wop.on_all_commit && + // also wait for apply, to preserve ordering with luminous peers. + wop.pending_apply.empty()) { + logger().info("{}: calling on_all_commit on {}", __func__, wop); + wop.on_all_commit->complete(0); + wop.on_all_commit = 0; + // TODO: wop.trace.event("ec write all committed"); + } + rmw_pipeline.check_ops(); return write_iertr::now(); } diff --git a/src/crimson/osd/osd_operations/ecrep_request.cc b/src/crimson/osd/osd_operations/ecrep_request.cc index 3b0b8fb2f6d3..5d28b197aea8 100644 --- a/src/crimson/osd/osd_operations/ecrep_request.cc +++ b/src/crimson/osd/osd_operations/ecrep_request.cc @@ -70,10 +70,8 @@ seastar::future<> ECRepRequest::with_pg( [pg] (Ref concrete_req) { return pg->handle_rep_write_op(std::move(concrete_req)); }, - [ec_backend] (Ref concrete_req) { - return ec_backend->handle_rep_write_reply( - std::move(concrete_req) - ).handle_error_interruptible(crimson::ct_error::assert_all{}); + [pg] (Ref concrete_req) { + return pg->handle_rep_write_reply(std::move(concrete_req)); }, [pg] (Ref concrete_req) { return pg->handle_rep_read_op(std::move(concrete_req)); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index d99ce6cffdaa..218f3ff4cc60 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1482,6 +1482,21 @@ PG::interruptible_future<> PG::handle_rep_write_op(Ref m) }).handle_error_interruptible(crimson::ct_error::assert_all{}); } +PG::interruptible_future<> PG::handle_rep_write_reply(Ref m) +{ + if (const auto& op = m->op; op.committed) { + // TODO: trace.event("sub write committed"); + if (op.from != pg_whoami) { + peering_state.update_peer_last_complete_ondisk(op.from, op.last_complete); + } + } + auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend()); + assert(ec_backend); + return ec_backend->handle_rep_write_reply( + std::move(m) + ).handle_error_interruptible(crimson::ct_error::assert_all{}); +} + PG::interruptible_future<> PG::handle_rep_read_op(Ref m) { auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend()); diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 1547c38cffd5..c4602b2571b6 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -782,6 +782,7 @@ public: seastar::future<> clear_temp_objects(); interruptible_future<> handle_rep_write_op(Ref); + interruptible_future<> handle_rep_write_reply(Ref); interruptible_future<> handle_rep_read_op(Ref); private: