%{_includedir}/rbd/librbd.h
%{_includedir}/rbd/librbd.hpp
%{_includedir}/rbd/features.h
+%dir %{_includedir}/rbd/asio
+%{_includedir}/rbd/asio/ContextWQ.hpp
%{_libdir}/librbd.so
%if %{with lttng}
%{_libdir}/librbd_tp.so
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_LIBRBD_ASIO_CONTEXT_WQ_HPP
+#define CEPH_LIBRBD_ASIO_CONTEXT_WQ_HPP
+
+#include <atomic>
+#include <memory>
+
+class CephContext;
+struct Context;
+
+namespace librbd {
+namespace asio {
+
+/**
+ * ContextWQ - interface for work queue execution contexts.
+ *
+ * This class is part of the public librbd API to allow external modules
+ * to provide custom implementations that schedule work on different
+ * execution contexts.
+ *
+ * External implementations should inherit from this class and implement
+ * the pure virtual methods to schedule work on their preferred execution context.
+ */
+class ContextWQ {
+public:
+ virtual ~ContextWQ() = default;
+
+ /**
+ * Queue a context to be executed on the work queue.
+ *
+ * @param ctx Context to execute
+ * @param r Return value to pass to context
+ */
+ virtual void queue(Context *ctx, int r = 0) = 0;
+
+ /**
+ * Drain all pending operations.
+ * Blocks until all queued operations complete.
+ */
+ virtual void drain() = 0;
+
+protected:
+ // Protected constructor for derived classes
+ explicit ContextWQ(void* cct) : m_cct(cct), m_queued_ops(0) {}
+
+ void* m_cct; // Opaque pointer to CephContext (or nullptr if not needed)
+ std::atomic<uint64_t> m_queued_ops;
+};
+
+} // namespace asio
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_ASIO_CONTEXT_WQ_HPP
CEPH_RBD_API int rbd_open(rados_ioctx_t io, const char *name,
rbd_image_t *image, const char *snap_name);
+CEPH_RBD_API int rbd_open_with_context_wq(rados_ioctx_t io, const char *name,
+ rbd_image_t *image, const char *snap_name,
+ void *context_wq);
CEPH_RBD_API int rbd_open_by_id(rados_ioctx_t io, const char *id,
rbd_image_t *image, const char *snap_name);
*/
CEPH_RBD_API int rbd_open_read_only(rados_ioctx_t io, const char *name,
rbd_image_t *image, const char *snap_name);
+CEPH_RBD_API int rbd_open_read_only_with_context_wq(rados_ioctx_t io, const char *name,
+ rbd_image_t *image, const char *snap_name,
+ void *context_wq);
CEPH_RBD_API int rbd_open_by_id_read_only(rados_ioctx_t io, const char *id,
rbd_image_t *image, const char *snap_name);
+
+/**
+ * Get the CephContext associated with an RBD image.
+ *
+ * @param image image handle
+ * @param cct Output parameter for CephContext pointer (opaque void*)
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RBD_API int rbd_get_ceph_context(rbd_image_t image, void** cct);
+
+/**
+ * Complete a Context with a return value.
+ *
+ * This function allows external modules to complete Context objects.
+ *
+ * @param ctx Opaque pointer to Context (must be a valid Context*)
+ * @param r Return value to pass to the context
+ */
+CEPH_RBD_API void rbd_context_complete(void* ctx, int r);
+
CEPH_RBD_API int rbd_aio_open_read_only(rados_ioctx_t io, const char *name,
rbd_image_t *image, const char *snap_name,
rbd_completion_t c);
#include "include/neorados/RADOS.hpp"
#include "include/rados/librados.hpp"
#include "common/dout.h"
-#include "librbd/asio/ContextWQ.h"
+#include "librbd/asio/AsioContextWQ.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
m_cct(m_rados_api->cct()),
m_io_context(m_rados_api->get_io_context()),
m_api_strand(std::make_unique<boost::asio::strand<executor_type>>(
- boost::asio::make_strand(m_io_context))),
- m_context_wq(std::make_unique<asio::ContextWQ>(m_cct, m_io_context)) {
- ldout(m_cct, 20) << dendl;
+ boost::asio::make_strand(m_io_context))) {
+
+ // ASIO mode: Create ASIO-based ContextWQ (default mode)
+ m_context_wq = std::make_shared<asio::AsioContextWQ>(m_cct, m_io_context);
+ ldout(m_cct, 20) << "ASIO mode" << dendl;
auto rados_threads = m_cct->_conf.get_val<uint64_t>("librados_thread_count");
auto rbd_threads = m_cct->_conf.get_val<uint64_t>("rbd_op_threads");
m_api_strand.reset();
}
+void AsioEngine::set_context_wq(std::shared_ptr<asio::ContextWQ> context_wq) {
+ ceph_assert(context_wq != nullptr);
+ ldout(m_cct, 20) << "Setting external ContextWQ" << dendl;
+ m_context_wq = context_wq;
+}
+
void AsioEngine::dispatch(Context* ctx, int r) {
dispatch([ctx, r]() { ctx->complete(r); });
}
return m_context_wq.get();
}
+ /**
+ * Set an external ContextWQ implementation, replaces the
+ * default ASIO-based ContextWQ.
+ *
+ * @param context_wq Shared pointer to external ContextWQ implementation
+ */
+ void set_context_wq(std::shared_ptr<asio::ContextWQ> context_wq);
+
template <typename T>
void dispatch(T&& t) {
boost::asio::dispatch(m_io_context, std::forward<T>(t));
boost::asio::io_context& m_io_context;
std::unique_ptr<boost::asio::strand<executor_type>> m_api_strand;
- std::unique_ptr<asio::ContextWQ> m_context_wq;
+ std::shared_ptr<asio::ContextWQ> m_context_wq;
};
} // namespace librbd
api/Snapshot.cc
api/Trash.cc
api/Utils.cc
- asio/ContextWQ.cc
+ asio/AsioContextWQ.cc
cache/ImageWriteback.cc
cache/ObjectCacherObjectDispatch.cc
cache/ObjectCacherWriteback.cc
endif()
endif(ENABLE_SHARED)
install(TARGETS librbd DESTINATION ${CMAKE_INSTALL_LIBDIR})
+# Install public headers for external ContextWQ implementations
+install(DIRECTORY
+ "${CMAKE_SOURCE_DIR}/src/include/rbd/asio"
+ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/rbd
+ FILES_MATCHING PATTERN "*.hpp")
asok_hook = nullptr;
}
+ void ImageCtx::set_context_wq(std::shared_ptr<asio::ContextWQ> context_wq) {
+ ceph_assert(context_wq != nullptr);
+ ceph_assert(asio_engine != nullptr);
+
+ // Set the external ContextWQ on the AsioEngine
+ asio_engine->set_context_wq(context_wq);
+
+ // Update op_work_queue pointer to point to the new work queue
+ op_work_queue = asio_engine->get_work_queue();
+ }
+
void ImageCtx::init_layout(int64_t pool_id)
{
if (stripe_unit == 0 || stripe_count == 0) {
journal::Policy *get_journal_policy() const;
void set_journal_policy(journal::Policy *policy);
+ /**
+ * Set an external ContextWQ implementation.
+ * The ContextWQ must remain valid for the lifetime of the image or until replaced.
+ *
+ * @param context_wq Shared pointer to external ContextWQ implementation
+ */
+ void set_context_wq(std::shared_ptr<asio::ContextWQ> context_wq);
+
void rebuild_data_io_context();
IOContext get_data_io_context() const;
IOContext duplicate_data_io_context() const;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+#include "librbd/asio/AsioContextWQ.h"
+#include "include/Context.h"
+#include "common/Cond.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::asio::AsioContextWQ: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace asio {
+
+AsioContextWQ::AsioContextWQ(CephContext* cct, boost::asio::io_context& io_context)
+ : ContextWQ(cct),
+ m_io_context(&io_context),
+ m_strand(std::make_unique<boost::asio::strand<executor_type>>(
+ boost::asio::make_strand(io_context))) {
+ ldout(get_cct(), 20) << dendl;
+}
+
+AsioContextWQ::~AsioContextWQ() {
+ ldout(get_cct(), 20) << dendl;
+ drain();
+ m_strand.reset();
+}
+
+void AsioContextWQ::queue(Context *ctx, int r) {
+ ++m_queued_ops;
+
+ // ensure all legacy ContextWQ users are dispatched sequentially for
+ // backwards compatibility (i.e. might not be concurrent thread-safe)
+ boost::asio::post(*m_strand, [this, ctx, r]() {
+ ctx->complete(r);
+
+ ceph_assert(m_queued_ops > 0);
+ --m_queued_ops;
+ });
+}
+
+void AsioContextWQ::drain() {
+ ldout(get_cct(), 20) << dendl;
+ C_SaferCond ctx;
+ drain_handler(&ctx);
+ ctx.wait();
+}
+
+void AsioContextWQ::drain_handler(Context* ctx) {
+ if (m_queued_ops == 0) {
+ ctx->complete(0);
+ return;
+ }
+
+ // new items might be queued while we are trying to drain, so we
+ // might need to post the handler multiple times
+ boost::asio::post(*m_strand, [this, ctx]() { drain_handler(ctx); });
+}
+
+} // namespace asio
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+#ifndef CEPH_LIBRBD_ASIO_ASIO_CONTEXT_WQ_H
+#define CEPH_LIBRBD_ASIO_ASIO_CONTEXT_WQ_H
+
+#include "librbd/asio/ContextWQ.h"
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/post.hpp>
+#include <memory>
+
+namespace librbd {
+namespace asio {
+
+/**
+ * ASIO-based implementation of ContextWQ.
+ *
+ * This implementation uses Boost.ASIO's io_context and strand to schedule
+ * work on ASIO thread pools, ensuring sequential execution for backwards
+ * compatibility with legacy code.
+ */
+class AsioContextWQ : public ContextWQ {
+public:
+ explicit AsioContextWQ(CephContext* cct, boost::asio::io_context& io_context);
+ ~AsioContextWQ() override;
+
+ void queue(Context *ctx, int r = 0) override;
+ void drain() override;
+
+private:
+ boost::asio::io_context* m_io_context;
+ using executor_type = boost::asio::io_context::executor_type;
+ std::unique_ptr<boost::asio::strand<executor_type>> m_strand;
+
+ void drain_handler(Context* ctx);
+
+ // Helper to get CephContext* from base class's void* m_cct
+ CephContext* get_cct() const {
+ return static_cast<CephContext*>(m_cct);
+ }
+};
+
+} // namespace asio
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_ASIO_ASIO_CONTEXT_WQ_H
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 sts=2 expandtab
-
-#include "librbd/asio/ContextWQ.h"
-#include "include/Context.h"
-#include "common/Cond.h"
-#include "common/dout.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::asio::ContextWQ: " \
- << this << " " << __func__ << ": "
-
-namespace librbd {
-namespace asio {
-
-ContextWQ::ContextWQ(CephContext* cct, boost::asio::io_context& io_context)
- : m_cct(cct), m_io_context(io_context),
- m_strand(std::make_unique<boost::asio::strand<executor_type>>(
- boost::asio::make_strand(io_context))),
- m_queued_ops(0) {
- ldout(m_cct, 20) << dendl;
-}
-
-ContextWQ::~ContextWQ() {
- ldout(m_cct, 20) << dendl;
- drain();
- m_strand.reset();
-}
-
-void ContextWQ::drain() {
- ldout(m_cct, 20) << dendl;
- C_SaferCond ctx;
- drain_handler(&ctx);
- ctx.wait();
-}
-
-void ContextWQ::drain_handler(Context* ctx) {
- if (m_queued_ops == 0) {
- ctx->complete(0);
- return;
- }
-
- // new items might be queued while we are trying to drain, so we
- // might need to post the handler multiple times
- boost::asio::post(*m_strand, [this, ctx]() { drain_handler(ctx); });
-}
-
-} // namespace asio
-} // namespace librbd
#ifndef CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
#define CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
+// Include the public header - ContextWQ is now part of the public API
+// Internal code can use this wrapper for convenience
#include "include/common_fwd.h"
#include "include/Context.h"
-#include <atomic>
-#include <memory>
-#include <boost/asio/io_context.hpp>
-#include <boost/asio/strand.hpp>
-#include <boost/asio/post.hpp>
-
-namespace librbd {
-namespace asio {
-
-class ContextWQ {
-public:
- explicit ContextWQ(CephContext* cct, boost::asio::io_context& io_context);
- ~ContextWQ();
-
- void drain();
-
- void queue(Context *ctx, int r = 0) {
- ++m_queued_ops;
-
- // ensure all legacy ContextWQ users are dispatched sequentially for
- // backwards compatibility (i.e. might not be concurrent thread-safe)
- boost::asio::post(*m_strand, [this, ctx, r]() {
- ctx->complete(r);
-
- ceph_assert(m_queued_ops > 0);
- --m_queued_ops;
- });
- }
-
-private:
- CephContext* m_cct;
- boost::asio::io_context& m_io_context;
- using executor_type = boost::asio::io_context::executor_type;
- std::unique_ptr<boost::asio::strand<executor_type>> m_strand;
-
- std::atomic<uint64_t> m_queued_ops;
-
- void drain_handler(Context* ctx);
-
-};
-
-} // namespace asio
-} // namespace librbd
+#include "include/rbd/asio/ContextWQ.hpp"
#endif // CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/internal.h"
+#include "librbd/asio/ContextWQ.h"
#include "librbd/Operations.h"
#include "librbd/Utils.h"
#include "librbd/api/Config.h"
}
}
-extern "C" int rbd_open(rados_ioctx_t p, const char *name, rbd_image_t *image,
- const char *snap_name)
+extern "C" int rbd_open_with_context_wq(rados_ioctx_t p, const char *name,
+ rbd_image_t *image, const char *snap_name,
+ void *context_wq)
{
librados::IoCtx io_ctx;
librados::IoCtx::from_rados_ioctx_t(p, io_ctx);
false);
tracepoint(librbd, open_image_enter, ictx, ictx->name.c_str(), ictx->id.c_str(), ictx->snap_name.c_str(), ictx->read_only);
+ // set ContextWQ before opening
+ if (context_wq != NULL) {
+ auto wq = static_cast<librbd::asio::ContextWQ*>(context_wq);
+ std::shared_ptr<librbd::asio::ContextWQ> shared_wq(
+ wq, [](librbd::asio::ContextWQ*) { /* no-op deleter - caller owns it */ });
+ ictx->set_context_wq(shared_wq);
+ }
+
int r = ictx->state->open(0);
if (r >= 0) {
*image = (rbd_image_t)ictx;
return r;
}
+extern "C" int rbd_open(rados_ioctx_t p, const char *name, rbd_image_t *image,
+ const char *snap_name)
+{
+ return rbd_open_with_context_wq(p, name, image, snap_name, NULL);
+}
+
extern "C" int rbd_open_by_id(rados_ioctx_t p, const char *id,
rbd_image_t *image, const char *snap_name)
{
return 0;
}
-extern "C" int rbd_open_read_only(rados_ioctx_t p, const char *name,
- rbd_image_t *image, const char *snap_name)
+extern "C" int rbd_open_read_only_with_context_wq(rados_ioctx_t p, const char *name,
+ rbd_image_t *image, const char *snap_name,
+ void *context_wq)
{
librados::IoCtx io_ctx;
librados::IoCtx::from_rados_ioctx_t(p, io_ctx);
true);
tracepoint(librbd, open_image_enter, ictx, ictx->name.c_str(), ictx->id.c_str(), ictx->snap_name.c_str(), ictx->read_only);
+ // set ContextWQ before opening
+ if (context_wq != NULL) {
+ auto wq = static_cast<librbd::asio::ContextWQ*>(context_wq);
+ std::shared_ptr<librbd::asio::ContextWQ> shared_wq(
+ wq, [](librbd::asio::ContextWQ*) { /* no-op deleter - caller owns it */ });
+ ictx->set_context_wq(shared_wq);
+ }
+
int r = ictx->state->open(0);
if (r >= 0) {
*image = (rbd_image_t)ictx;
return r;
}
+extern "C" int rbd_open_read_only(rados_ioctx_t p, const char *name,
+ rbd_image_t *image, const char *snap_name)
+{
+ return rbd_open_read_only_with_context_wq(p, name, image, snap_name, NULL);
+}
+
+extern "C" int rbd_get_ceph_context(rbd_image_t image, void** cct)
+{
+ librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
+ if (ictx == NULL || cct == NULL) {
+ return -EINVAL;
+ }
+ *cct = static_cast<void*>(ictx->cct);
+ return 0;
+}
+
+extern "C" void rbd_context_complete(void* ctx, int r)
+{
+ if (ctx == NULL) {
+ return;
+ }
+ // Cast the opaque pointer to Context* and complete it
+ auto context = static_cast<Context*>(ctx);
+ context->complete(r);
+}
+
extern "C" int rbd_open_by_id_read_only(rados_ioctx_t p, const char *id,
rbd_image_t *image, const char *snap_name)
{
-Subproject commit 57806b95163dd5e49f16a218ceddc136151aabdc
+Subproject commit a8a4a3c589d8bf59f6dc2d987877b1c1e6de48a4