]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: wrapper class for ContextWQ
authorJason Dillaman <dillaman@redhat.com>
Thu, 11 Jun 2020 19:45:25 +0000 (15:45 -0400)
committerJason Dillaman <dillaman@redhat.com>
Sat, 13 Jun 2020 02:44:53 +0000 (22:44 -0400)
Numerous state machines in librbd utilize ContextWQ for handling deferred
completion work. Now that we have an ASIO thread pool, we want to remove
the older ThreadPool/WorkQueue implementation. To avoid massive refactoring,
this ContextWQ will mimic the interface of the old ContextWQ class but
utilize ASIO for dispatching.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AsioEngine.cc
src/librbd/AsioEngine.h
src/librbd/CMakeLists.txt
src/librbd/asio/ContextWQ.cc [new file with mode: 0644]
src/librbd/asio/ContextWQ.h [new file with mode: 0644]

index ef1ec785409bd58e1c8d668bfb02c6fbd6d0c6de..99e381f0bed2ce8ee79afb839d184209341a79f2 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "librbd/AsioEngine.h"
 #include "common/dout.h"
+#include "librbd/asio/ContextWQ.h"
 #include <boost/system/error_code.hpp>
 
 #define dout_subsys ceph_subsys_rbd
@@ -35,6 +36,8 @@ void AsioEngine::init() {
       m_io_context.run(ec);
     });
   }
+
+  m_work_queue = std::make_unique<asio::ContextWQ>(m_io_context);
 }
 
 void AsioEngine::shut_down() {
index 5d530a9e37a660bf02e42adf5da50415a805d13d..6c7f1a7b17109b123d334c59900284ce619e3c18 100644 (file)
@@ -5,6 +5,7 @@
 #define CEPH_LIBRBD_ASIO_ENGINE_H
 
 #include "include/common_fwd.h"
+#include <memory>
 #include <optional>
 #include <thread>
 #include <vector>
@@ -13,6 +14,8 @@
 
 namespace librbd {
 
+namespace asio { struct ContextWQ; }
+
 class AsioEngine {
 public:
   explicit AsioEngine(CephContext* cct);
@@ -22,6 +25,10 @@ public:
     return m_io_context;
   }
 
+  inline asio::ContextWQ* get_work_queue() {
+    return m_work_queue.get();
+  }
+
 private:
   typedef std::vector<std::thread> Threads;
 
@@ -34,6 +41,8 @@ private:
   boost::asio::io_context m_io_context;
   std::optional<WorkGuard> m_work_guard;
 
+  std::unique_ptr<asio::ContextWQ> m_work_queue;
+
   void init();
   void shut_down();
 
index 0bbd73396ebfb808bd38250015ec1ca70252296d..f4c48c61817300d2a1b66f2dd3be850f17d77c57 100644 (file)
@@ -37,6 +37,7 @@ set(librbd_internal_srcs
   api/PoolMetadata.cc
   api/Snapshot.cc
   api/Trash.cc
+  asio/ContextWQ.cc
   cache/ImageWriteback.cc
   cache/ObjectCacherObjectDispatch.cc
   cache/ObjectCacherWriteback.cc
diff --git a/src/librbd/asio/ContextWQ.cc b/src/librbd/asio/ContextWQ.cc
new file mode 100644 (file)
index 0000000..a81fe1a
--- /dev/null
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/asio/ContextWQ.h"
+#include "include/Context.h"
+#include "common/Cond.h"
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/post.hpp>
+
+namespace librbd {
+namespace asio {
+
+ContextWQ::ContextWQ(boost::asio::io_context& io_context)
+  : m_io_context(io_context), m_strand(io_context),
+    m_queued_ops(0) {
+}
+
+void ContextWQ::drain() {
+  C_SaferCond ctx;
+  drain_handler(&ctx);
+  ctx.wait();
+}
+
+void ContextWQ::drain_handler(Context* ctx) {
+  if (m_queued_ops == 0) {
+    ctx->complete(0);
+    return;
+  }
+
+  // new items might be queued while we are trying to drain, so we
+  // might need to post the handler multiple times
+  boost::asio::post(m_io_context, boost::asio::bind_executor(
+    m_strand, [this, ctx]() { drain_handler(ctx); }));
+}
+
+void ContextWQ::queue(Context *ctx, int r) {
+  ++m_queued_ops;
+
+  // ensure all legacy ContextWQ users are dispatched sequentially for backwards
+  // compatibility (i.e. might not be concurrent thread-safe)
+  boost::asio::post(m_io_context, boost::asio::bind_executor(
+    m_strand,
+    [this, ctx, r]() {
+      ctx->complete(r);
+
+      ceph_assert(m_queued_ops > 0);
+      --m_queued_ops;
+    }));
+}
+
+} // namespace asio
+} // namespace librbd
diff --git a/src/librbd/asio/ContextWQ.h b/src/librbd/asio/ContextWQ.h
new file mode 100644 (file)
index 0000000..be906d9
--- /dev/null
@@ -0,0 +1,36 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
+#define CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
+
+#include <atomic>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/io_context_strand.hpp>
+
+struct Context;
+
+namespace librbd {
+namespace asio {
+
+class ContextWQ {
+public:
+  explicit ContextWQ(boost::asio::io_context& io_context);
+
+  void drain();
+  void queue(Context *ctx, int r = 0);
+
+private:
+  boost::asio::io_context& m_io_context;
+  boost::asio::io_context::strand m_strand;
+
+  std::atomic<uint64_t> m_queued_ops;
+
+  void drain_handler(Context* ctx);
+
+};
+
+} // namespace asio
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_ASIO_CONTEXT_WQ_H