From: Jason Dillaman Date: Thu, 11 Jun 2020 19:45:25 +0000 (-0400) Subject: librbd: wrapper class for ContextWQ X-Git-Tag: v17.0.0~2093^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=fdad0b77f40c69832d6cd59294fd228c301ac408;p=ceph.git librbd: wrapper class for ContextWQ Numerous state machines in librbd utilize ContextWQ for handling deferred completion work. Now that we have an ASIO thread pool, we want to remove the older ThreadPool/WorkQueue implementation. To avoid massive refactoring, this ContextWQ will mimic the interface of the old ContextWQ class but utilize ASIO for dispatching. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/AsioEngine.cc b/src/librbd/AsioEngine.cc index ef1ec785409bd..99e381f0bed2c 100644 --- a/src/librbd/AsioEngine.cc +++ b/src/librbd/AsioEngine.cc @@ -3,6 +3,7 @@ #include "librbd/AsioEngine.h" #include "common/dout.h" +#include "librbd/asio/ContextWQ.h" #include #define dout_subsys ceph_subsys_rbd @@ -35,6 +36,8 @@ void AsioEngine::init() { m_io_context.run(ec); }); } + + m_work_queue = std::make_unique(m_io_context); } void AsioEngine::shut_down() { diff --git a/src/librbd/AsioEngine.h b/src/librbd/AsioEngine.h index 5d530a9e37a66..6c7f1a7b17109 100644 --- a/src/librbd/AsioEngine.h +++ b/src/librbd/AsioEngine.h @@ -5,6 +5,7 @@ #define CEPH_LIBRBD_ASIO_ENGINE_H #include "include/common_fwd.h" +#include #include #include #include @@ -13,6 +14,8 @@ namespace librbd { +namespace asio { struct ContextWQ; } + class AsioEngine { public: explicit AsioEngine(CephContext* cct); @@ -22,6 +25,10 @@ public: return m_io_context; } + inline asio::ContextWQ* get_work_queue() { + return m_work_queue.get(); + } + private: typedef std::vector Threads; @@ -34,6 +41,8 @@ private: boost::asio::io_context m_io_context; std::optional m_work_guard; + std::unique_ptr m_work_queue; + void init(); void shut_down(); diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt index 0bbd73396ebfb..f4c48c6181730 100644 --- a/src/librbd/CMakeLists.txt +++ b/src/librbd/CMakeLists.txt @@ -37,6 +37,7 @@ set(librbd_internal_srcs api/PoolMetadata.cc api/Snapshot.cc api/Trash.cc + asio/ContextWQ.cc cache/ImageWriteback.cc cache/ObjectCacherObjectDispatch.cc cache/ObjectCacherWriteback.cc diff --git a/src/librbd/asio/ContextWQ.cc b/src/librbd/asio/ContextWQ.cc new file mode 100644 index 0000000000000..a81fe1ada1b9d --- /dev/null +++ b/src/librbd/asio/ContextWQ.cc @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/asio/ContextWQ.h" +#include "include/Context.h" +#include "common/Cond.h" +#include +#include + +namespace librbd { +namespace asio { + +ContextWQ::ContextWQ(boost::asio::io_context& io_context) + : m_io_context(io_context), m_strand(io_context), + m_queued_ops(0) { +} + +void ContextWQ::drain() { + C_SaferCond ctx; + drain_handler(&ctx); + ctx.wait(); +} + +void ContextWQ::drain_handler(Context* ctx) { + if (m_queued_ops == 0) { + ctx->complete(0); + return; + } + + // new items might be queued while we are trying to drain, so we + // might need to post the handler multiple times + boost::asio::post(m_io_context, boost::asio::bind_executor( + m_strand, [this, ctx]() { drain_handler(ctx); })); +} + +void ContextWQ::queue(Context *ctx, int r) { + ++m_queued_ops; + + // ensure all legacy ContextWQ users are dispatched sequentially for backwards + // compatibility (i.e. might not be concurrent thread-safe) + boost::asio::post(m_io_context, boost::asio::bind_executor( + m_strand, + [this, ctx, r]() { + ctx->complete(r); + + ceph_assert(m_queued_ops > 0); + --m_queued_ops; + })); +} + +} // namespace asio +} // namespace librbd diff --git a/src/librbd/asio/ContextWQ.h b/src/librbd/asio/ContextWQ.h new file mode 100644 index 0000000000000..be906d9ee3b72 --- /dev/null +++ b/src/librbd/asio/ContextWQ.h @@ -0,0 +1,36 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_ASIO_CONTEXT_WQ_H +#define CEPH_LIBRBD_ASIO_CONTEXT_WQ_H + +#include +#include +#include + +struct Context; + +namespace librbd { +namespace asio { + +class ContextWQ { +public: + explicit ContextWQ(boost::asio::io_context& io_context); + + void drain(); + void queue(Context *ctx, int r = 0); + +private: + boost::asio::io_context& m_io_context; + boost::asio::io_context::strand m_strand; + + std::atomic m_queued_ops; + + void drain_handler(Context* ctx); + +}; + +} // namespace asio +} // namespace librbd + +#endif // CEPH_LIBRBD_ASIO_CONTEXT_WQ_H