--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/AsioEngine.h"
+#include "common/dout.h"
+#include <boost/system/error_code.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::AsioEngine: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+
+AsioEngine::AsioEngine(CephContext* cct)
+ : m_cct(cct) {
+ init();
+}
+
+AsioEngine::~AsioEngine() {
+ shut_down();
+}
+
+void AsioEngine::init() {
+ auto thread_count = m_cct->_conf.get_val<uint64_t>("rbd_op_threads");
+ m_threads.reserve(thread_count);
+
+ // prevent IO context from exiting if no work is currently scheduled
+ m_work_guard.emplace(boost::asio::make_work_guard(m_io_context));
+
+ ldout(m_cct, 5) << "spawning " << thread_count << " threads" << dendl;
+ for (auto i = 0U; i < thread_count; i++) {
+ m_threads.emplace_back([=] {
+ boost::system::error_code ec;
+ m_io_context.run(ec);
+ });
+ }
+}
+
+void AsioEngine::shut_down() {
+ ldout(m_cct, 5) << "joining threads" << dendl;
+
+ m_work_guard.reset();
+ for (auto& thread : m_threads) {
+ thread.join();
+ }
+ m_threads.clear();
+
+ ldout(m_cct, 5) << "done" << dendl;
+}
+
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_ASIO_ENGINE_H
+#define CEPH_LIBRBD_ASIO_ENGINE_H
+
+#include "include/common_fwd.h"
+#include <optional>
+#include <thread>
+#include <vector>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+
+namespace librbd {
+
+class AsioEngine {
+public:
+ explicit AsioEngine(CephContext* cct);
+ ~AsioEngine();
+
+ inline boost::asio::io_context& get_io_context() {
+ return m_io_context;
+ }
+
+private:
+ typedef std::vector<std::thread> Threads;
+
+ typedef boost::asio::executor_work_guard<
+ boost::asio::io_context::executor_type> WorkGuard;
+
+ CephContext* m_cct;
+ Threads m_threads;
+
+ boost::asio::io_context m_io_context;
+ std::optional<WorkGuard> m_work_guard;
+
+ void init();
+ void shut_down();
+
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_ASIO_ENGINE_H
WatchNotifyTypes.cc)
set(librbd_internal_srcs
+ AsioEngine.cc
AsyncObjectThrottle.cc
AsyncRequest.cc
DeepCopyRequest.cc
#include "common/WorkQueue.h"
#include "common/Timer.h"
+#include "librbd/AsioEngine.h"
#include "librbd/AsyncRequest.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/internal.h"
}
};
+boost::asio::io_context& get_asio_engine_io_context(CephContext* cct) {
+ auto asio_engine_singleton = ImageCtx::get_asio_engine(cct);
+ return asio_engine_singleton->get_io_context();
+}
+
} // anonymous namespace
const string ImageCtx::METADATA_CONF_PREFIX = "conf_";
state(new ImageState<>(this)),
operations(new Operations<>(*this)),
exclusive_lock(nullptr), object_map(nullptr),
+ io_context(get_asio_engine_io_context(cct)),
op_work_queue(nullptr),
plugin_registry(new PluginRegistry<ImageCtx>(this)),
external_callback_completions(32),
journal_policy = policy;
}
+ AsioEngine* ImageCtx::get_asio_engine(CephContext* cct) {
+ return &cct->lookup_or_create_singleton_object<AsioEngine>(
+ "librbd::AsioEngine", false, cct);
+ }
+
void ImageCtx::get_thread_pool_instance(CephContext *cct,
ThreadPool **thread_pool,
ContextWQ **op_work_queue) {
#include "librbd/AsyncRequest.h"
#include "librbd/Types.h"
+#include <boost/asio/io_context.hpp>
#include <boost/lockfree/policies.hpp>
#include <boost/lockfree/queue.hpp>
namespace librbd {
+ struct AsioEngine;
template <typename> class ExclusiveLock;
template <typename> class ImageState;
template <typename> class ImageWatcher;
xlist<operation::ResizeRequest<ImageCtx>*> resize_reqs;
+ boost::asio::io_context& io_context;
+
io::ImageDispatcherInterface *io_image_dispatcher = nullptr;
io::ObjectDispatcherInterface *io_object_dispatcher = nullptr;
journal::Policy *get_journal_policy() const;
void set_journal_policy(journal::Policy *policy);
+ static AsioEngine* get_asio_engine(CephContext* cct);
static void get_thread_pool_instance(CephContext *cct,
ThreadPool **thread_pool,
ContextWQ **op_work_queue);