#include "librbd/AsioEngine.h"
#include "common/dout.h"
+#include "librbd/asio/ContextWQ.h"
#include <boost/system/error_code.hpp>
#define dout_subsys ceph_subsys_rbd
m_io_context.run(ec);
});
}
+
+ m_work_queue = std::make_unique<asio::ContextWQ>(m_io_context);
}
void AsioEngine::shut_down() {
#define CEPH_LIBRBD_ASIO_ENGINE_H
#include "include/common_fwd.h"
+#include <memory>
#include <optional>
#include <thread>
#include <vector>
namespace librbd {
+namespace asio { struct ContextWQ; }
+
class AsioEngine {
public:
explicit AsioEngine(CephContext* cct);
return m_io_context;
}
+ inline asio::ContextWQ* get_work_queue() {
+ return m_work_queue.get();
+ }
+
private:
typedef std::vector<std::thread> Threads;
boost::asio::io_context m_io_context;
std::optional<WorkGuard> m_work_guard;
+ std::unique_ptr<asio::ContextWQ> m_work_queue;
+
void init();
void shut_down();
api/PoolMetadata.cc
api/Snapshot.cc
api/Trash.cc
+ asio/ContextWQ.cc
cache/ImageWriteback.cc
cache/ObjectCacherObjectDispatch.cc
cache/ObjectCacherWriteback.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/asio/ContextWQ.h"
+#include "include/Context.h"
+#include "common/Cond.h"
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/post.hpp>
+
+namespace librbd {
+namespace asio {
+
+ContextWQ::ContextWQ(boost::asio::io_context& io_context)
+ : m_io_context(io_context), m_strand(io_context),
+ m_queued_ops(0) {
+}
+
+void ContextWQ::drain() {
+ C_SaferCond ctx;
+ drain_handler(&ctx);
+ ctx.wait();
+}
+
+void ContextWQ::drain_handler(Context* ctx) {
+ if (m_queued_ops == 0) {
+ ctx->complete(0);
+ return;
+ }
+
+ // new items might be queued while we are trying to drain, so we
+ // might need to post the handler multiple times
+ boost::asio::post(m_io_context, boost::asio::bind_executor(
+ m_strand, [this, ctx]() { drain_handler(ctx); }));
+}
+
+void ContextWQ::queue(Context *ctx, int r) {
+ ++m_queued_ops;
+
+ // ensure all legacy ContextWQ users are dispatched sequentially for backwards
+ // compatibility (i.e. might not be concurrent thread-safe)
+ boost::asio::post(m_io_context, boost::asio::bind_executor(
+ m_strand,
+ [this, ctx, r]() {
+ ctx->complete(r);
+
+ ceph_assert(m_queued_ops > 0);
+ --m_queued_ops;
+ }));
+}
+
+} // namespace asio
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
+#define CEPH_LIBRBD_ASIO_CONTEXT_WQ_H
+
+#include <atomic>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/io_context_strand.hpp>
+
+struct Context;
+
+namespace librbd {
+namespace asio {
+
+class ContextWQ {
+public:
+ explicit ContextWQ(boost::asio::io_context& io_context);
+
+ void drain();
+ void queue(Context *ctx, int r = 0);
+
+private:
+ boost::asio::io_context& m_io_context;
+ boost::asio::io_context::strand m_strand;
+
+ std::atomic<uint64_t> m_queued_ops;
+
+ void drain_handler(Context* ctx);
+
+};
+
+} // namespace asio
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_ASIO_CONTEXT_WQ_H