Coroutines.h file added containing type and struct definitions.
Coroutine spawned in PrimaryLogPG::do_op if the message contains a CEPH_OSD_OP_CALL op.
ECBackend::objects_read_sync yields while running ECBackend::objects_read_async. Coroutine is resumed once the read is complete.
When a coroutine op is being processed, all other ops to the same PG are blocked.
https://tracker.ceph.com/issues/74531
Signed-off-by: Matty Williams <Matty.Williams@ibm.com>
common_utf8 extblkdev arch crc32
${LIB_RESOLV}
Boost::thread
+ Boost::context
Boost::random
Boost::program_options
Boost::date_time
ECInject.h
ECOmapJournal.cc
ECOmapJournal.h
+ Coroutines.h
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
${CMAKE_SOURCE_DIR}/src/mgr/OSDPerfMetricTypes.cc
${osd_cyg_functions_src}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+/*
+ * This header file contains types required to integrate boost coroutines into
+ * the OSD backend.
+ *
+ * - yield_token_t (pull_type): Is used by a coroutine to suspend execution (yield)
+ * while waiting for an I/O operation to complete.
+ * - resume_token_t (push_type): Is used by the completion callback to
+ */
+
+#pragma once
+#include <boost/coroutine2/all.hpp>
+
+using yield_token_t = boost::coroutines2::coroutine<void>::pull_type;
+using resume_token_t = boost::coroutines2::coroutine<void>::push_type;
+
+struct CoroHandles {
+ yield_token_t& yield;
+ resume_token_t& resume;
+};
\ No newline at end of file
}
int ECBackend::objects_read_sync(
+ const hobject_t &hoid,
+ uint64_t object_size,
+ const std::list<std::pair<ec_align_t,
+ std::pair<ceph::buffer::list*, Context*>>> &to_read,
+ CoroHandles coro)
+{
+ int result = 0;
+ bool done = false;
+ bool waiting = false;
+
+ // Callback for the async read
+ Context *on_finish = new LambdaContext([&, coro](int r) {
+ result = r;
+ done = true;
+
+ if (waiting) {
+ coro.resume();
+ }
+ });
+
+ objects_read_async(hoid, object_size, to_read, on_finish, true);
+
+ // If the async read is not yet complete, yield and wait for it to complete
+ if (!done) {
+ waiting = true;
+ coro.yield();
+ }
+
+ return result;
+}
+
+int ECBackend::objects_read_local(
const hobject_t &hoid,
uint64_t off,
uint64_t len,
#include "erasure-code/ErasureCodeInterface.h"
#include "include/buffer.h"
#include "osd/scrubber/scrub_backend.h"
+#include "Coroutines.h"
/* This file is soon going to be replaced (before next release), so we are going
* to simply ignore all deprecated warnings.
);
int objects_read_sync(
- const hobject_t &hoid,
- uint64_t off,
- uint64_t len,
- uint32_t op_flags,
- ceph::buffer::list *bl
- );
+ const hobject_t &hoid,
+ uint64_t object_size,
+ const std::list<std::pair<ec_align_t,
+ std::pair<ceph::buffer::list*, Context*>>> &to_read,
+ CoroHandles coro
+ );
+
+ int objects_read_local(
+ const hobject_t &hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags,
+ ceph::buffer::list *bl
+ );
std::pair<uint64_t, uint64_t> extent_to_shard_extent(uint64_t off, uint64_t len);
rmw_pipeline.start_rmw(std::move(op));
}
-int ECBackendL::objects_read_sync(
+int ECBackendL::objects_read_local(
const hobject_t &hoid,
uint64_t off,
uint64_t len,
OpRequestRef op
);
- int objects_read_sync(
+ int objects_read_local(
const hobject_t &hoid,
uint64_t off,
uint64_t len,
}
int objects_read_sync(const hobject_t &hoid, uint64_t off, uint64_t len,
- uint32_t op_flags, ceph::buffer::list *bl) override
+ uint32_t op_flags, ceph::buffer::list *bl, uint64_t object_size,
+ std::optional<CoroHandles> coro) override
+ {
+ // Sync reads are only supported in FastEC, and from a coroutine
+ if (!is_optimized() || !coro.has_value()) {
+ return -EOPNOTSUPP;
+ }
+
+ ec_align_t align{off, len, op_flags};
+ std::list<std::pair<ec_align_t, std::pair<bufferlist*, Context*>>> to_read;
+ to_read.push_back({ align, { bl, nullptr } });
+ return optimized.objects_read_sync(hoid, object_size, to_read, *coro);
+ }
+
+ int objects_read_local(const hobject_t &hoid, uint64_t off, uint64_t len,
+ uint32_t op_flags, ceph::buffer::list *bl) override
{
if (is_optimized()) {
- return optimized.objects_read_sync(hoid, off, len, op_flags, bl);
+ return optimized.objects_read_local(hoid, off, len, op_flags, bl);
}
- return legacy.objects_read_sync(hoid, off, len, op_flags, bl);
+ return legacy.objects_read_local(hoid, off, len, op_flags, bl);
}
int objects_readv_sync(const hobject_t &hoid,
#include "osd/osd_types.h"
#include "common/TrackedOp.h"
#include "common/tracer.h"
+#include "osd/Coroutines.h"
+
/**
* The OpRequest takes in a Message* and takes over a single reference
* to it, which it puts() when destroyed.
OpInfo op_info;
public:
+ std::optional<CoroHandles> coro_handles = std::nullopt;
+
int maybe_init_op_info(const OSDMap &osdmap);
auto get_flags() const { return op_info.get_flags(); }
bool allows_returnvec() const { return op_info.allows_returnvec(); }
bool ec_direct_read() const { return op_info.ec_direct_read(); }
void set_ec_direct_read() { return op_info.set_ec_direct_read(); }
+ bool ec_sync_read() const { return op_info.ec_sync_read(); }
+ void set_ec_sync_read() { return op_info.set_ec_sync_read(); }
std::vector<OpInfo::ClassInfo> classes() const {
return op_info.get_classes();
#include "ECListener.h"
#include "ECTypes.h"
#include "PGTransaction.h"
+#include "Coroutines.h"
#include "osd_types.h"
#include "pg_features.h"
uint64_t off,
uint64_t len,
uint32_t op_flags,
- ceph::buffer::list *bl) = 0;
+ ceph::buffer::list *bl,
+ uint64_t object_size,
+ std::optional<CoroHandles> coro
+ ) = 0;
+
+ virtual int objects_read_local(
+ const hobject_t &hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags,
+ ceph::buffer::list *bl) = 0;
virtual int objects_readv_sync(
const hobject_t &hoid,
* to get the second.
*/
if (write_ordered && ctx->op->may_read()) {
- if (ctx->op->may_read_data()) {
+ // In EC, reads can overtake writes unless the RWEXCL lock is held
+ if (ctx->op->may_read_data() || pool.info.is_erasure()) {
ctx->lock_type = RWState::RWEXCL;
} else {
ctx->lock_type = RWState::RWWRITE;
}
}
-/** do_op - do an op
- * pg lock will be held (if multithreaded)
- * osd_lock NOT held.
- */
-void PrimaryLogPG::do_op(OpRequestRef& op)
+bool PrimaryLogPG::should_use_coroutine(MOSDOp* m)
{
- FUNCTRACE(cct);
- // NOTE: take a non-const pointer here; we must be careful not to
- // change anything that will break other reads on m (operator<<).
- MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
- ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
- if (m->finish_decode()) {
- op->reset_desc(); // for TrackedOp
- m->clear_payload();
+ if (!pool.info.allows_ecoptimizations()) {
+ return false;
}
- dout(20) << __func__ << ": op " << *m << dendl;
+ for (const auto& osd_op : m->ops) {
+ if (osd_op.op.op == CEPH_OSD_OP_CALL) {
+ return true;
+ }
+ }
+
+ return false;
+}
+void PrimaryLogPG::do_op_impl(OpRequestRef op)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
const hobject_t head = m->get_hobj().get_head();
if (!info.pgid.pgid.contains(
OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
+ if (coro_op_in_flight && op == active_coro_op) {
+ active_coro_ctx = ctx;
+ }
+
if (m->has_flag(CEPH_OSD_FLAG_SKIPRWLOCKS)) {
dout(20) << __func__ << ": skipping rw locks" << dendl;
} else if (m->get_flags() & CEPH_OSD_FLAG_FLUSH) {
maybe_force_recovery();
}
+/** do_op - do an op
+ * pg lock will be held (if multithreaded)
+ * osd_lock NOT held.
+ */
+void PrimaryLogPG::do_op(OpRequestRef& op)
+{
+ FUNCTRACE(cct);
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ if (m->finish_decode()) {
+ op->reset_desc();
+ m->clear_payload();
+ }
+
+ if (coro_op_in_flight) {
+ dout(20) << __func__ << ": coroutine op in flight, queuing " << op << dendl;
+ waiting_for_coro_op.push_back(op);
+ return;
+ }
+
+ dout(20) << __func__ << ": op " << *m << dendl;
+
+ if (should_use_coroutine(m)) {
+ dout(20) << __func__ << ": spawning a coroutine for EC optimized CALL op" << dendl;
+ coro_op_in_flight = true;
+ active_coro_op = op;
+ OpRequest* op_raw = op.get();
+
+ // Spawn a coroutine to handle the message
+ auto resumer = std::make_unique<resume_token_t>(
+ [this, op_raw](yield_token_t& yield) {
+ op_raw->coro_handles.emplace(CoroHandles{ yield, *coro_resumer });
+ {
+ const OpRequestRef op_ref(op_raw);
+ do_op_impl(op_ref);
+ }
+
+ // Cleanup
+ coro_resumer = nullptr;
+ on_coroutine_complete();
+ });
+
+ coro_resumer = std::move(resumer);
+
+ // Startup the coroutine
+ (*coro_resumer)();
+ } else {
+ // Handle the message directly in the current thread
+ do_op_impl(op);
+ }
+}
+
+void PrimaryLogPG::on_coroutine_complete()
+{
+ ceph_assert(coro_op_in_flight);
+ coro_op_in_flight = false;
+ active_coro_op = nullptr;
+
+ if (active_coro_ctx) {
+ dout(20) << __func__ << ": Warning - OpContext not cleaned up normally" << dendl;
+ active_coro_ctx = nullptr;
+ }
+
+ if (!waiting_for_coro_op.empty()) {
+ dout(20) << __func__ << ": requeuing " << waiting_for_coro_op.size() << " ops" << dendl;
+ requeue_ops(waiting_for_coro_op);
+ }
+}
+
PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
OpRequestRef op,
bool write_ordered,
ctx->on_finish.erase(p++)) {
(*p)();
}
+
+ if (ctx == active_coro_ctx) {
+ active_coro_ctx = nullptr;
+ }
+
delete ctx;
}
maybe_crc = oi.data_digest;
if (ctx->op->ec_direct_read()) {
- result = pgbackend->objects_read_sync(
+ result = pgbackend->objects_read_local(
soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
-
- dout(20) << " EC sync read for " << soid << " result=" << result << dendl;
+ dout(20) << " EC local read for " << soid << " result=" << result << dendl;
+ } else if (ctx->op->ec_sync_read()) {
+ result = pgbackend->objects_read_sync(
+ soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata,
+ oi.size, ctx->op->coro_handles);
+ dout(20) << " EC sync read for " << soid << " result=" << result << dendl;
} else {
ctx->pending_async_reads.push_back(
make_pair(
}
} else {
int r = pgbackend->objects_read_sync(
- soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
+ soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata,
+ oi.size, ctx->op->coro_handles);
// whole object? can we verify the checksum?
if (r >= 0 && op.extent.offset == 0 &&
(uint64_t)r == oi.size && oi.is_data_digest()) {
break;
case CEPH_OSD_OP_SYNC_READ:
- if (pool.info.is_erasure()) {
+ if (pool.info.is_erasure() && !pool.info.allows_ecoptimizations()) {
result = -EOPNOTSUPP;
break;
+ } else if (pool.info.is_erasure() && pool.info.allows_ecoptimizations()) {
+ ctx->op->set_ec_sync_read();
}
// fall through
case CEPH_OSD_OP_READ:
dout(10) << __func__ << ": async_read noted for " << soid << dendl;
} else {
- result = pgbackend->objects_read_sync(
- oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl);
+ result = pgbackend->objects_read_sync(
+ oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl,
+ oi.size, ctx->op->coro_handles);
if (result < 0)
return result;
}
* As s result, we leave this as a future work.
*/
int r = pgbackend->objects_read_sync(
- oi.soid, 0, oi.size, 0, &bl);
+ oi.soid, 0, oi.size, 0, &bl, oi.size, std::nullopt);
if (r < 0) {
dout(0) << __func__ << " read fail " << oi.soid
<< " len: " << oi.size << " r: " << r << dendl;
{
dout(10) << __func__ << dendl;
+ if (coro_resumer != nullptr) {
+ dout(20) << __func__ << ": Stopping active coroutine" << dendl;
+ if (active_coro_ctx) {
+ dout(20) << __func__ << ": Cleaning up orphaned OpContext from coroutine" << dendl;
+ // Remove from in_progress_async_reads if present
+ for (auto it = in_progress_async_reads.begin();
+ it != in_progress_async_reads.end(); ++it) {
+ if (it->second == active_coro_ctx) {
+ in_progress_async_reads.erase(it);
+ break;
+ }
+ }
+ // Close the context to release all resources
+ close_op_ctx(active_coro_ctx);
+ active_coro_ctx = nullptr;
+ }
+ coro_resumer = nullptr;
+ coro_op_in_flight = false;
+ }
+
if (hit_set && hit_set->insert_count() == 0) {
dout(20) << " discarding empty hit_set" << dendl;
hit_set_clear();
requeue_ops(waiting_for_flush);
requeue_ops(waiting_for_active);
requeue_ops(waiting_for_readable);
+ requeue_ops(waiting_for_coro_op);
+ if (active_coro_op) {
+ requeue_op(active_coro_op);
+ active_coro_op = nullptr;
+ }
vector<ceph_tid_t> tids;
cancel_copy_ops(is_primary(), &tids);
#include "ReplicatedBackend.h"
#include "PGTransaction.h"
#include "cls/cas/cls_cas_ops.h"
+#include "Coroutines.h"
class CopyFromCallback;
class PromoteCallback;
protected:
+ OpRequestRef active_coro_op = nullptr;
+ std::unique_ptr<resume_token_t> coro_resumer = nullptr;
+ bool coro_op_in_flight = false;
+ std::list<OpRequestRef> waiting_for_coro_op;
+ OpContext* active_coro_ctx = nullptr;
+
/**
* Grabs locks for OpContext, should be cleaned up in close_op_ctx
*
void do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle) override;
+ bool should_use_coroutine(MOSDOp* m);
+ void do_op_impl(OpRequestRef op);
void do_op(OpRequestRef& op);
+ void on_coroutine_complete();
void record_write_error(OpRequestRef op, const hobject_t &soid,
MOSDOpReply *orig_reply, int r,
OpContext *ctx_for_op_returns=nullptr);
}
int ReplicatedBackend::objects_read_sync(
+ const hobject_t &hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags,
+ bufferlist *bl,
+ uint64_t object_size,
+ std::optional<CoroHandles> coro)
+{
+ return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
+}
+
+int ReplicatedBackend::objects_read_local(
const hobject_t &hoid,
uint64_t off,
uint64_t len,
uint64_t off,
uint64_t len,
uint32_t op_flags,
- ceph::buffer::list *bl) override;
+ ceph::buffer::list *bl,
+ uint64_t object_size,
+ std::optional<CoroHandles> coro
+ ) override;
+
+ int objects_read_local(
+ const hobject_t &hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags,
+ ceph::buffer::list *bl) override;
int objects_readv_sync(
const hobject_t &hoid,
bool OpInfo::ec_direct_read() const {
return check_rmw(CEPH_OSD_RMW_FLAG_EC_DIRECT_READ);
}
+bool OpInfo::ec_sync_read() const {
+ return check_rmw(CEPH_OSD_RMW_FLAG_EC_SYNC_READ);
+}
/**
* may_read_data()
*
void OpInfo::set_returnvec() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RETURNVEC); }
void OpInfo::set_read_data() { set_rmw_flags(CEPH_OSD_RMW_FLAG_READ_DATA); }
void OpInfo::set_ec_direct_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_EC_DIRECT_READ); }
+void OpInfo::set_ec_sync_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_EC_SYNC_READ); }
int OpInfo::set_from_op(
bool need_skip_promote() const;
bool allows_returnvec() const;
bool ec_direct_read() const;
+ bool ec_sync_read() const;
void set_read();
void set_write();
void set_returnvec();
void set_read_data();
void set_ec_direct_read();
+ void set_ec_sync_read();
int set_from_op(
const MOSDOp *m,
CEPH_OSD_RMW_FLAG_RETURNVEC = (1 << 11),
CEPH_OSD_RMW_FLAG_READ_DATA = (1 << 12),
CEPH_OSD_RMW_FLAG_EC_DIRECT_READ = (1 << 13),
+ CEPH_OSD_RMW_FLAG_EC_SYNC_READ = (1 << 14),
};
uint64_t off,
uint64_t len,
uint32_t op_flags,
- ceph::buffer::list *bl) override {
+ ceph::buffer::list *bl,
+ uint64_t object_size,
+ std::optional<CoroHandles> coro
+ ) override {
+ return 0;
+ }
+
+ int objects_read_local(
+ const hobject_t &hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags,
+ ceph::buffer::list *bl) override {
return 0;
}