From: Matty Williams Date: Mon, 26 Jan 2026 14:59:08 +0000 (+0000) Subject: osd: Use coroutines to perform synchronous reads in EC without blocking any threads X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=81e9daa3e444fc3d620a6d9585fd7efe30801054;p=ceph-ci.git osd: Use coroutines to perform synchronous reads in EC without blocking any threads Coroutines.h file added containing type and struct definitions. Coroutine spawned in PrimaryLogPG::do_op if the message contains a CEPH_OSD_OP_CALL op. ECBackend::objects_read_sync yields while running ECBackend::objects_read_async. Coroutine is resumed once the read is complete. When a coroutine op is being processed, all other ops to the same PG are blocked. https://tracker.ceph.com/issues/74531 Signed-off-by: Matty Williams --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index acdcea4464f..3a568920bbf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -588,6 +588,7 @@ set(ceph_common_deps common_utf8 extblkdev arch crc32 ${LIB_RESOLV} Boost::thread + Boost::context Boost::random Boost::program_options Boost::date_time diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index 930acf42d13..86c9ce6736f 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -54,6 +54,7 @@ set(osd_srcs ECInject.h ECOmapJournal.cc ECOmapJournal.h + Coroutines.h ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc ${CMAKE_SOURCE_DIR}/src/mgr/OSDPerfMetricTypes.cc ${osd_cyg_functions_src} diff --git a/src/osd/Coroutines.h b/src/osd/Coroutines.h new file mode 100644 index 00000000000..e3bbb0b193c --- /dev/null +++ b/src/osd/Coroutines.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +/* + * This header file contains types required to integrate boost coroutines into + * the OSD backend. + * + * - yield_token_t (pull_type): Is used by a coroutine to suspend execution (yield) + * while waiting for an I/O operation to complete. + * - resume_token_t (push_type): Is used by the completion callback to + */ + +#pragma once +#include + +using yield_token_t = boost::coroutines2::coroutine::pull_type; +using resume_token_t = boost::coroutines2::coroutine::push_type; + +struct CoroHandles { + yield_token_t& yield; + resume_token_t& resume; +}; \ No newline at end of file diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index f341050523c..eb590020e16 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -1124,6 +1124,38 @@ void ECBackend::submit_transaction( } int ECBackend::objects_read_sync( + const hobject_t &hoid, + uint64_t object_size, + const std::list>> &to_read, + CoroHandles coro) +{ + int result = 0; + bool done = false; + bool waiting = false; + + // Callback for the async read + Context *on_finish = new LambdaContext([&, coro](int r) { + result = r; + done = true; + + if (waiting) { + coro.resume(); + } + }); + + objects_read_async(hoid, object_size, to_read, on_finish, true); + + // If the async read is not yet complete, yield and wait for it to complete + if (!done) { + waiting = true; + coro.yield(); + } + + return result; +} + +int ECBackend::objects_read_local( const hobject_t &hoid, uint64_t off, uint64_t len, diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index a5c83acde1c..80d85772c64 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -28,6 +28,7 @@ #include "erasure-code/ErasureCodeInterface.h" #include "include/buffer.h" #include "osd/scrubber/scrub_backend.h" +#include "Coroutines.h" /* This file is soon going to be replaced (before next release), so we are going * to simply ignore all deprecated warnings. @@ -131,12 +132,20 @@ class ECBackend : public ECCommon { ); int objects_read_sync( - const hobject_t &hoid, - uint64_t off, - uint64_t len, - uint32_t op_flags, - ceph::buffer::list *bl - ); + const hobject_t &hoid, + uint64_t object_size, + const std::list>> &to_read, + CoroHandles coro + ); + + int objects_read_local( + const hobject_t &hoid, + uint64_t off, + uint64_t len, + uint32_t op_flags, + ceph::buffer::list *bl + ); std::pair extent_to_shard_extent(uint64_t off, uint64_t len); diff --git a/src/osd/ECBackendL.cc b/src/osd/ECBackendL.cc index 3e0183eb547..487c7d53d59 100644 --- a/src/osd/ECBackendL.cc +++ b/src/osd/ECBackendL.cc @@ -1562,7 +1562,7 @@ void ECBackendL::submit_transaction( rmw_pipeline.start_rmw(std::move(op)); } -int ECBackendL::objects_read_sync( +int ECBackendL::objects_read_local( const hobject_t &hoid, uint64_t off, uint64_t len, diff --git a/src/osd/ECBackendL.h b/src/osd/ECBackendL.h index f560b42a7c9..4a9d22062a0 100644 --- a/src/osd/ECBackendL.h +++ b/src/osd/ECBackendL.h @@ -117,7 +117,7 @@ public: OpRequestRef op ); - int objects_read_sync( + int objects_read_local( const hobject_t &hoid, uint64_t off, uint64_t len, diff --git a/src/osd/ECSwitch.h b/src/osd/ECSwitch.h index ea735126914..3be4930fb9b 100644 --- a/src/osd/ECSwitch.h +++ b/src/osd/ECSwitch.h @@ -259,12 +259,27 @@ public: } int objects_read_sync(const hobject_t &hoid, uint64_t off, uint64_t len, - uint32_t op_flags, ceph::buffer::list *bl) override + uint32_t op_flags, ceph::buffer::list *bl, uint64_t object_size, + std::optional coro) override + { + // Sync reads are only supported in FastEC, and from a coroutine + if (!is_optimized() || !coro.has_value()) { + return -EOPNOTSUPP; + } + + ec_align_t align{off, len, op_flags}; + std::list>> to_read; + to_read.push_back({ align, { bl, nullptr } }); + return optimized.objects_read_sync(hoid, object_size, to_read, *coro); + } + + int objects_read_local(const hobject_t &hoid, uint64_t off, uint64_t len, + uint32_t op_flags, ceph::buffer::list *bl) override { if (is_optimized()) { - return optimized.objects_read_sync(hoid, off, len, op_flags, bl); + return optimized.objects_read_local(hoid, off, len, op_flags, bl); } - return legacy.objects_read_sync(hoid, off, len, op_flags, bl); + return legacy.objects_read_local(hoid, off, len, op_flags, bl); } int objects_readv_sync(const hobject_t &hoid, diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index 255aa59d530..180e1272201 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -19,6 +19,8 @@ #include "osd/osd_types.h" #include "common/TrackedOp.h" #include "common/tracer.h" +#include "osd/Coroutines.h" + /** * The OpRequest takes in a Message* and takes over a single reference * to it, which it puts() when destroyed. @@ -30,6 +32,8 @@ private: OpInfo op_info; public: + std::optional coro_handles = std::nullopt; + int maybe_init_op_info(const OSDMap &osdmap); auto get_flags() const { return op_info.get_flags(); } @@ -50,6 +54,8 @@ public: bool allows_returnvec() const { return op_info.allows_returnvec(); } bool ec_direct_read() const { return op_info.ec_direct_read(); } void set_ec_direct_read() { return op_info.set_ec_direct_read(); } + bool ec_sync_read() const { return op_info.ec_sync_read(); } + void set_ec_sync_read() { return op_info.set_ec_sync_read(); } std::vector classes() const { return op_info.get_classes(); diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 33b6510bb11..34e62b9af5a 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -33,6 +33,7 @@ #include "ECListener.h" #include "ECTypes.h" #include "PGTransaction.h" +#include "Coroutines.h" #include "osd_types.h" #include "pg_features.h" @@ -643,7 +644,17 @@ typedef std::shared_ptr OSDMapRef; uint64_t off, uint64_t len, uint32_t op_flags, - ceph::buffer::list *bl) = 0; + ceph::buffer::list *bl, + uint64_t object_size, + std::optional coro + ) = 0; + + virtual int objects_read_local( + const hobject_t &hoid, + uint64_t off, + uint64_t len, + uint32_t op_flags, + ceph::buffer::list *bl) = 0; virtual int objects_readv_sync( const hobject_t &hoid, diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 07d89b7044c..4e91cd7bd6d 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1697,7 +1697,8 @@ bool PrimaryLogPG::get_rw_locks(bool write_ordered, OpContext *ctx) * to get the second. */ if (write_ordered && ctx->op->may_read()) { - if (ctx->op->may_read_data()) { + // In EC, reads can overtake writes unless the RWEXCL lock is held + if (ctx->op->may_read_data() || pool.info.is_erasure()) { ctx->lock_type = RWState::RWEXCL; } else { ctx->lock_type = RWState::RWWRITE; @@ -1995,24 +1996,24 @@ void PrimaryLogPG::do_request( } } -/** do_op - do an op - * pg lock will be held (if multithreaded) - * osd_lock NOT held. - */ -void PrimaryLogPG::do_op(OpRequestRef& op) +bool PrimaryLogPG::should_use_coroutine(MOSDOp* m) { - FUNCTRACE(cct); - // NOTE: take a non-const pointer here; we must be careful not to - // change anything that will break other reads on m (operator<<). - MOSDOp *m = static_cast(op->get_nonconst_req()); - ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - if (m->finish_decode()) { - op->reset_desc(); // for TrackedOp - m->clear_payload(); + if (!pool.info.allows_ecoptimizations()) { + return false; } - dout(20) << __func__ << ": op " << *m << dendl; + for (const auto& osd_op : m->ops) { + if (osd_op.op.op == CEPH_OSD_OP_CALL) { + return true; + } + } + + return false; +} +void PrimaryLogPG::do_op_impl(OpRequestRef op) +{ + MOSDOp *m = static_cast(op->get_nonconst_req()); const hobject_t head = m->get_hobj().get_head(); if (!info.pgid.pgid.contains( @@ -2501,6 +2502,10 @@ void PrimaryLogPG::do_op(OpRequestRef& op) OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this); + if (coro_op_in_flight && op == active_coro_op) { + active_coro_ctx = ctx; + } + if (m->has_flag(CEPH_OSD_FLAG_SKIPRWLOCKS)) { dout(20) << __func__ << ": skipping rw locks" << dendl; } else if (m->get_flags() & CEPH_OSD_FLAG_FLUSH) { @@ -2579,6 +2584,75 @@ void PrimaryLogPG::do_op(OpRequestRef& op) maybe_force_recovery(); } +/** do_op - do an op + * pg lock will be held (if multithreaded) + * osd_lock NOT held. + */ +void PrimaryLogPG::do_op(OpRequestRef& op) +{ + FUNCTRACE(cct); + MOSDOp *m = static_cast(op->get_nonconst_req()); + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + if (m->finish_decode()) { + op->reset_desc(); + m->clear_payload(); + } + + if (coro_op_in_flight) { + dout(20) << __func__ << ": coroutine op in flight, queuing " << op << dendl; + waiting_for_coro_op.push_back(op); + return; + } + + dout(20) << __func__ << ": op " << *m << dendl; + + if (should_use_coroutine(m)) { + dout(20) << __func__ << ": spawning a coroutine for EC optimized CALL op" << dendl; + coro_op_in_flight = true; + active_coro_op = op; + OpRequest* op_raw = op.get(); + + // Spawn a coroutine to handle the message + auto resumer = std::make_unique( + [this, op_raw](yield_token_t& yield) { + op_raw->coro_handles.emplace(CoroHandles{ yield, *coro_resumer }); + { + const OpRequestRef op_ref(op_raw); + do_op_impl(op_ref); + } + + // Cleanup + coro_resumer = nullptr; + on_coroutine_complete(); + }); + + coro_resumer = std::move(resumer); + + // Startup the coroutine + (*coro_resumer)(); + } else { + // Handle the message directly in the current thread + do_op_impl(op); + } +} + +void PrimaryLogPG::on_coroutine_complete() +{ + ceph_assert(coro_op_in_flight); + coro_op_in_flight = false; + active_coro_op = nullptr; + + if (active_coro_ctx) { + dout(20) << __func__ << ": Warning - OpContext not cleaned up normally" << dendl; + active_coro_ctx = nullptr; + } + + if (!waiting_for_coro_op.empty()) { + dout(20) << __func__ << ": requeuing " << waiting_for_coro_op.size() << " ops" << dendl; + requeue_ops(waiting_for_coro_op); + } +} + PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( OpRequestRef op, bool write_ordered, @@ -4445,6 +4519,11 @@ void PrimaryLogPG::close_op_ctx(OpContext *ctx) { ctx->on_finish.erase(p++)) { (*p)(); } + + if (ctx == active_coro_ctx) { + active_coro_ctx = nullptr; + } + delete ctx; } @@ -5906,10 +5985,14 @@ int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) { maybe_crc = oi.data_digest; if (ctx->op->ec_direct_read()) { - result = pgbackend->objects_read_sync( + result = pgbackend->objects_read_local( soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata); - - dout(20) << " EC sync read for " << soid << " result=" << result << dendl; + dout(20) << " EC local read for " << soid << " result=" << result << dendl; + } else if (ctx->op->ec_sync_read()) { + result = pgbackend->objects_read_sync( + soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata, + oi.size, ctx->op->coro_handles); + dout(20) << " EC sync read for " << soid << " result=" << result << dendl; } else { ctx->pending_async_reads.push_back( make_pair( @@ -5925,7 +6008,8 @@ int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) { } } else { int r = pgbackend->objects_read_sync( - soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata); + soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata, + oi.size, ctx->op->coro_handles); // whole object? can we verify the checksum? if (r >= 0 && op.extent.offset == 0 && (uint64_t)r == oi.size && oi.is_data_digest()) { @@ -6160,9 +6244,11 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_SYNC_READ: - if (pool.info.is_erasure()) { + if (pool.info.is_erasure() && !pool.info.allows_ecoptimizations()) { result = -EOPNOTSUPP; break; + } else if (pool.info.is_erasure() && pool.info.allows_ecoptimizations()) { + ctx->op->set_ec_sync_read(); } // fall through case CEPH_OSD_OP_READ: @@ -9415,8 +9501,9 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp, dout(10) << __func__ << ": async_read noted for " << soid << dendl; } else { - result = pgbackend->objects_read_sync( - oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl); + result = pgbackend->objects_read_sync( + oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl, + oi.size, ctx->op->coro_handles); if (result < 0) return result; } @@ -10727,7 +10814,7 @@ int PrimaryLogPG::do_cdc(const object_info_t& oi, * As s result, we leave this as a future work. */ int r = pgbackend->objects_read_sync( - oi.soid, 0, oi.size, 0, &bl); + oi.soid, 0, oi.size, 0, &bl, oi.size, std::nullopt); if (r < 0) { dout(0) << __func__ << " read fail " << oi.soid << " len: " << oi.size << " r: " << r << dendl; @@ -13164,6 +13251,26 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction &t) { dout(10) << __func__ << dendl; + if (coro_resumer != nullptr) { + dout(20) << __func__ << ": Stopping active coroutine" << dendl; + if (active_coro_ctx) { + dout(20) << __func__ << ": Cleaning up orphaned OpContext from coroutine" << dendl; + // Remove from in_progress_async_reads if present + for (auto it = in_progress_async_reads.begin(); + it != in_progress_async_reads.end(); ++it) { + if (it->second == active_coro_ctx) { + in_progress_async_reads.erase(it); + break; + } + } + // Close the context to release all resources + close_op_ctx(active_coro_ctx); + active_coro_ctx = nullptr; + } + coro_resumer = nullptr; + coro_op_in_flight = false; + } + if (hit_set && hit_set->insert_count() == 0) { dout(20) << " discarding empty hit_set" << dendl; hit_set_clear(); @@ -13180,6 +13287,11 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction &t) requeue_ops(waiting_for_flush); requeue_ops(waiting_for_active); requeue_ops(waiting_for_readable); + requeue_ops(waiting_for_coro_op); + if (active_coro_op) { + requeue_op(active_coro_op); + active_coro_op = nullptr; + } vector tids; cancel_copy_ops(is_primary(), &tids); diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index b01472c09bb..8e0b29f72bc 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -34,6 +34,7 @@ #include "ReplicatedBackend.h" #include "PGTransaction.h" #include "cls/cas/cls_cas_ops.h" +#include "Coroutines.h" class CopyFromCallback; class PromoteCallback; @@ -937,6 +938,12 @@ public: protected: + OpRequestRef active_coro_op = nullptr; + std::unique_ptr coro_resumer = nullptr; + bool coro_op_in_flight = false; + std::list waiting_for_coro_op; + OpContext* active_coro_ctx = nullptr; + /** * Grabs locks for OpContext, should be cleaned up in close_op_ctx * @@ -1562,7 +1569,10 @@ public: void do_request( OpRequestRef& op, ThreadPool::TPHandle &handle) override; + bool should_use_coroutine(MOSDOp* m); + void do_op_impl(OpRequestRef op); void do_op(OpRequestRef& op); + void on_coroutine_complete(); void record_write_error(OpRequestRef op, const hobject_t &soid, MOSDOpReply *orig_reply, int r, OpContext *ctx_for_op_returns=nullptr); diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index a4bc537fb70..1d10eb44652 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -277,6 +277,18 @@ void ReplicatedBackend::on_change() } int ReplicatedBackend::objects_read_sync( + const hobject_t &hoid, + uint64_t off, + uint64_t len, + uint32_t op_flags, + bufferlist *bl, + uint64_t object_size, + std::optional coro) +{ + return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags); +} + +int ReplicatedBackend::objects_read_local( const hobject_t &hoid, uint64_t off, uint64_t len, diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index a40dc1b6282..18f410819a3 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -186,7 +186,17 @@ public: uint64_t off, uint64_t len, uint32_t op_flags, - ceph::buffer::list *bl) override; + ceph::buffer::list *bl, + uint64_t object_size, + std::optional coro + ) override; + + int objects_read_local( + const hobject_t &hoid, + uint64_t off, + uint64_t len, + uint32_t op_flags, + ceph::buffer::list *bl) override; int objects_readv_sync( const hobject_t &hoid, diff --git a/src/osd/osd_op_util.cc b/src/osd/osd_op_util.cc index 2c2ad8e3ec9..1a969f24db6 100644 --- a/src/osd/osd_op_util.cc +++ b/src/osd/osd_op_util.cc @@ -55,6 +55,9 @@ bool OpInfo::allows_returnvec() const { bool OpInfo::ec_direct_read() const { return check_rmw(CEPH_OSD_RMW_FLAG_EC_DIRECT_READ); } +bool OpInfo::ec_sync_read() const { + return check_rmw(CEPH_OSD_RMW_FLAG_EC_SYNC_READ); +} /** * may_read_data() * @@ -83,6 +86,7 @@ void OpInfo::set_force_rwordered() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RWORDERED); void OpInfo::set_returnvec() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RETURNVEC); } void OpInfo::set_read_data() { set_rmw_flags(CEPH_OSD_RMW_FLAG_READ_DATA); } void OpInfo::set_ec_direct_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_EC_DIRECT_READ); } +void OpInfo::set_ec_sync_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_EC_SYNC_READ); } int OpInfo::set_from_op( diff --git a/src/osd/osd_op_util.h b/src/osd/osd_op_util.h index ba1acae4c9e..adebeaa75fc 100644 --- a/src/osd/osd_op_util.h +++ b/src/osd/osd_op_util.h @@ -60,6 +60,7 @@ public: bool need_skip_promote() const; bool allows_returnvec() const; bool ec_direct_read() const; + bool ec_sync_read() const; void set_read(); void set_write(); @@ -74,6 +75,7 @@ public: void set_returnvec(); void set_read_data(); void set_ec_direct_read(); + void set_ec_sync_read(); int set_from_op( const MOSDOp *m, diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 8174ea57130..b2e6792e725 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -378,6 +378,7 @@ enum { CEPH_OSD_RMW_FLAG_RETURNVEC = (1 << 11), CEPH_OSD_RMW_FLAG_READ_DATA = (1 << 12), CEPH_OSD_RMW_FLAG_EC_DIRECT_READ = (1 << 13), + CEPH_OSD_RMW_FLAG_EC_SYNC_READ = (1 << 14), }; diff --git a/src/test/osd/TestPeeringState.cc b/src/test/osd/TestPeeringState.cc index d0a69655fab..32ecb482894 100644 --- a/src/test/osd/TestPeeringState.cc +++ b/src/test/osd/TestPeeringState.cc @@ -722,7 +722,19 @@ public: uint64_t off, uint64_t len, uint32_t op_flags, - ceph::buffer::list *bl) override { + ceph::buffer::list *bl, + uint64_t object_size, + std::optional coro + ) override { + return 0; + } + + int objects_read_local( + const hobject_t &hoid, + uint64_t off, + uint64_t len, + uint32_t op_flags, + ceph::buffer::list *bl) override { return 0; }