// vim: ts=8 sw=2 smarttab
#include "librbd/AsioEngine.h"
+#include "include/stringify.h"
+#include "include/neorados/RADOS.hpp"
+#include "include/rados/librados.hpp"
#include "common/dout.h"
#include "librbd/asio/ContextWQ.h"
-#include <boost/system/error_code.hpp>
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
namespace librbd {
-AsioEngine::AsioEngine(CephContext* cct)
- : m_cct(cct) {
- init();
-}
-
-AsioEngine::~AsioEngine() {
- shut_down();
-}
-
-void AsioEngine::init() {
- auto thread_count = m_cct->_conf.get_val<uint64_t>("rbd_op_threads");
- m_threads.reserve(thread_count);
-
- // prevent IO context from exiting if no work is currently scheduled
- m_work_guard.emplace(boost::asio::make_work_guard(m_io_context));
-
- ldout(m_cct, 5) << "spawning " << thread_count << " threads" << dendl;
- for (auto i = 0U; i < thread_count; i++) {
- m_threads.emplace_back([=] {
- boost::system::error_code ec;
- m_io_context.run(ec);
- });
+AsioEngine::AsioEngine(std::shared_ptr<librados::Rados> rados)
+ : m_rados_api(std::make_shared<neorados::RADOS>(
+ neorados::RADOS::make_with_librados(*rados))),
+ m_cct(m_rados_api->cct()),
+ m_io_context(m_rados_api->get_io_context()),
+ m_context_wq(std::make_unique<asio::ContextWQ>(m_io_context)) {
+ ldout(m_cct, 20) << dendl;
+
+ auto rados_threads = m_cct->_conf.get_val<uint64_t>("librados_thread_count");
+ auto rbd_threads = m_cct->_conf.get_val<uint64_t>("rbd_op_threads");
+ if (rbd_threads > rados_threads) {
+ // inherit the librados thread count -- but increase it if librbd wants to
+ // utilize more threads
+ m_cct->_conf.set_val("librados_thread_count", stringify(rbd_threads));
}
-
- m_work_queue = std::make_unique<asio::ContextWQ>(m_io_context);
}
-void AsioEngine::shut_down() {
- ldout(m_cct, 5) << "joining threads" << dendl;
-
- m_work_guard.reset();
- for (auto& thread : m_threads) {
- thread.join();
- }
- m_threads.clear();
+AsioEngine::AsioEngine(librados::IoCtx& io_ctx)
+ : AsioEngine(std::make_shared<librados::Rados>(io_ctx)) {
+}
- ldout(m_cct, 5) << "done" << dendl;
+AsioEngine::~AsioEngine() {
+ ldout(m_cct, 20) << dendl;
}
} // namespace librbd
#define CEPH_LIBRBD_ASIO_ENGINE_H
#include "include/common_fwd.h"
+#include "include/rados/librados_fwd.hpp"
#include <memory>
-#include <optional>
-#include <thread>
-#include <vector>
-#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
+namespace neorados { struct RADOS; }
+
namespace librbd {
namespace asio { struct ContextWQ; }
class AsioEngine {
public:
- explicit AsioEngine(CephContext* cct);
+ explicit AsioEngine(std::shared_ptr<librados::Rados> rados);
+ explicit AsioEngine(librados::IoCtx& io_ctx);
~AsioEngine();
+ AsioEngine(AsioEngine&&) = delete;
+ AsioEngine(const AsioEngine&) = delete;
+ AsioEngine& operator=(const AsioEngine&) = delete;
+
+ inline neorados::RADOS& get_rados_api() {
+ return *m_rados_api;
+ }
+
inline boost::asio::io_context& get_io_context() {
return m_io_context;
}
+ inline operator boost::asio::io_context&() {
+ return m_io_context;
+ }
+ inline boost::asio::io_context::executor_type get_executor() {
+ return m_io_context.get_executor();
+ }
inline asio::ContextWQ* get_work_queue() {
- return m_work_queue.get();
+ return m_context_wq.get();
}
private:
- typedef std::vector<std::thread> Threads;
-
- typedef boost::asio::executor_work_guard<
- boost::asio::io_context::executor_type> WorkGuard;
-
+ std::shared_ptr<neorados::RADOS> m_rados_api;
CephContext* m_cct;
- Threads m_threads;
-
- boost::asio::io_context m_io_context;
- std::optional<WorkGuard> m_work_guard;
-
- std::unique_ptr<asio::ContextWQ> m_work_queue;
-
- void init();
- void shut_down();
+ boost::asio::io_context& m_io_context;
+ std::unique_ptr<asio::ContextWQ> m_context_wq;
};
} // namespace librbd
rbd_internal
rbd_types
journal
+ libneorados
librados
- cls_rbd_client
- cls_lock_client
- cls_journal_client
+ cls_rbd_client
+ cls_lock_client
+ cls_journal_client
ceph-common
pthread
${CMAKE_DL_LIBS}
}
};
-boost::asio::io_context& get_asio_engine_io_context(CephContext* cct) {
- auto asio_engine_singleton = ImageCtx::get_asio_engine(cct);
- return asio_engine_singleton->get_io_context();
-}
-
} // anonymous namespace
const string ImageCtx::METADATA_CONF_PREFIX = "conf_";
read_only_flags(ro ? IMAGE_READ_ONLY_FLAG_USER : 0U),
exclusive_locked(false),
name(image_name),
+ asio_engine(std::make_shared<AsioEngine>(p)),
image_watcher(NULL),
journal(NULL),
owner_lock(ceph::make_shared_mutex(util::unique_lock_name("librbd::ImageCtx::owner_lock", this))),
state(new ImageState<>(this)),
operations(new Operations<>(*this)),
exclusive_lock(nullptr), object_map(nullptr),
- io_context(get_asio_engine_io_context(cct)),
- op_work_queue(nullptr),
+ op_work_queue(asio_engine->get_work_queue()),
plugin_registry(new PluginRegistry<ImageCtx>(this)),
external_callback_completions(32),
event_socket_completions(32),
// FIPS zeroization audit 20191117: this memset is not security related.
memset(&header, 0, sizeof(header));
- get_work_queue(cct, &op_work_queue);
io_image_dispatcher = new io::ImageDispatcher<ImageCtx>(this);
io_object_dispatcher = new io::ObjectDispatcher<ImageCtx>(this);
journal_policy = policy;
}
- AsioEngine* ImageCtx::get_asio_engine(CephContext* cct) {
- return &cct->lookup_or_create_singleton_object<AsioEngine>(
- "librbd::AsioEngine", false, cct);
- }
-
- void ImageCtx::get_work_queue(CephContext *cct,
- asio::ContextWQ **op_work_queue) {
- *op_work_queue = get_asio_engine(cct)->get_work_queue();
- }
-
void ImageCtx::get_timer_instance(CephContext *cct, SafeTimer **timer,
ceph::mutex **timer_lock) {
auto safe_timer_singleton =
#include <atomic>
#include <list>
#include <map>
+#include <memory>
#include <set>
#include <string>
#include <vector>
#include "librbd/AsyncRequest.h"
#include "librbd/Types.h"
-#include <boost/asio/io_context.hpp>
#include <boost/lockfree/policies.hpp>
#include <boost/lockfree/queue.hpp>
-class Finisher;
-class ThreadPool;
class SafeTimer;
+namespace neorados {
+class RADOS;
+} // namespace neorados
+
namespace librbd {
struct AsioEngine;
std::string name;
cls::rbd::SnapshotNamespace snap_namespace;
std::string snap_name;
+
+ std::shared_ptr<AsioEngine> asio_engine;
+
IoCtx data_ctx, md_ctx;
ImageWatcher<ImageCtx> *image_watcher;
Journal<ImageCtx> *journal;
xlist<operation::ResizeRequest<ImageCtx>*> resize_reqs;
- boost::asio::io_context& io_context;
-
io::ImageDispatcherInterface *io_image_dispatcher = nullptr;
io::ObjectDispatcherInterface *io_object_dispatcher = nullptr;
journal::Policy *get_journal_policy() const;
void set_journal_policy(journal::Policy *policy);
- static AsioEngine* get_asio_engine(CephContext* cct);
- static void get_work_queue(CephContext *cct,
- asio::ContextWQ **op_work_queue);
static void get_timer_instance(CephContext *cct, SafeTimer **timer,
ceph::mutex **timer_lock);
};
#include "common/errno.h"
#include "common/Cond.h"
#include "common/WorkQueue.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/asio/ContextWQ.h"
class QuiesceWatchers {
public:
- explicit QuiesceWatchers(CephContext *cct)
+ explicit QuiesceWatchers(CephContext *cct, asio::ContextWQ* work_queue)
: m_cct(cct),
+ m_work_queue(work_queue),
m_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::QuiesceWatchers::m_lock", this))) {
- ImageCtx::get_work_queue(m_cct, &m_work_queue);
}
~QuiesceWatchers() {
m_lock(ceph::make_mutex(util::unique_lock_name("librbd::ImageState::m_lock", this))),
m_last_refresh(0), m_refresh_seq(0),
m_update_watchers(new ImageUpdateWatchers(image_ctx->cct)),
- m_quiesce_watchers(new QuiesceWatchers(image_ctx->cct)) {
+ m_quiesce_watchers(new QuiesceWatchers(
+ image_ctx->cct, image_ctx->asio_engine->get_work_queue())) {
}
template <typename I>
#include "common/errno.h"
#include "common/Cond.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/AsioEngine.h"
#include "librbd/DeepCopyRequest.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
template <typename I>
int Image<I>::deep_copy(I *src, I *dest, bool flatten,
ProgressContext &prog_ctx) {
- CephContext *cct = src->cct;
librados::snap_t snap_id_start = 0;
librados::snap_t snap_id_end;
{
snap_id_end = src->snap_id;
}
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(src->md_ctx);
C_SaferCond cond;
SnapSeqs snap_seqs;
deep_copy::ProgressHandler progress_handler{&prog_ctx};
auto req = DeepCopyRequest<I>::create(
src, dest, snap_id_start, snap_id_end, 0U, flatten, boost::none,
- op_work_queue, &snap_seqs, &progress_handler, &cond);
+ asio_engine.get_work_queue(), &snap_seqs, &progress_handler, &cond);
req->send();
int r = cond.wait();
if (r < 0) {
// fall-through if trash isn't supported
}
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(io_ctx);
// might be a V1 image format that cannot be moved to the trash
// and would not have been listed in the V2 directory -- or the OSDs
// are too old and don't support the trash feature
C_SaferCond cond;
auto req = librbd::image::RemoveRequest<I>::create(
- io_ctx, image_name, "", false, false, prog_ctx, op_work_queue, &cond);
+ io_ctx, image_name, "", false, false, prog_ctx,
+ asio_engine.get_work_queue(), &cond);
req->send();
return cond.wait();
#include "common/errno.h"
#include "common/Cond.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
ceph_assert(dst_image_ctx->ignore_migrating);
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(m_cct, &op_work_queue);
+ auto asio_engine = dst_image_ctx->asio_engine;
+
C_SaferCond on_remove;
auto req = librbd::image::RemoveRequest<>::create(
- m_dst_io_ctx, dst_image_ctx, false, false, *m_prog_ctx, op_work_queue,
- &on_remove);
+ m_dst_io_ctx, dst_image_ctx, false, false, *m_prog_ctx,
+ asio_engine->get_work_queue(), &on_remove);
req->send();
r = on_remove.wait();
}
}
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(m_cct, &op_work_queue);
-
ConfigProxy config{m_cct->_conf};
api::Config<I>::apply_pool_overrides(m_dst_io_ctx, &config);
auto *req = image::CreateRequest<I>::create(
config, m_dst_io_ctx, m_dst_image_name, m_dst_image_id, size,
m_image_options, image::CREATE_FLAG_SKIP_MIRROR_ENABLE,
- cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "", "", op_work_queue, &on_create);
+ cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "", "",
+ m_src_image_ctx->op_work_queue, &on_create);
req->send();
} else {
r = util::create_ioctx(m_src_image_ctx->md_ctx, "destination image",
auto *req = image::CloneRequest<I>::create(
config, parent_io_ctx, parent_spec.image_id, "", {}, parent_spec.snap_id,
m_dst_io_ctx, m_dst_image_name, m_dst_image_id, m_image_options,
- cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "", "", op_work_queue, &on_create);
+ cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "", "",
+ m_src_image_ctx->op_work_queue, &on_create);
req->send();
}
ceph_assert(m_src_image_ctx->ignore_migrating);
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(m_cct, &op_work_queue);
+ auto asio_engine = m_src_image_ctx->asio_engine;
+
C_SaferCond on_remove;
auto req = librbd::image::RemoveRequest<I>::create(
- m_src_io_ctx, m_src_image_ctx, false, true, *m_prog_ctx, op_work_queue,
- &on_remove);
+ m_src_io_ctx, m_src_image_ctx, false, true, *m_prog_ctx,
+ asio_engine->get_work_queue(), &on_remove);
req->send();
r = on_remove.wait();
#include "common/dout.h"
#include "common/errno.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/Journal.h"
break;
}
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(io_ctx);
for (auto &it : images) {
auto &image_id = it.first;
// need to call get_info for every image to retrieve promotion state
mirror_image_info_t info;
- r = image_get_info(io_ctx, op_work_queue, image_id, &info);
+ r = image_get_info(io_ctx, asio_engine.get_work_queue(), image_id, &info);
if (r >= 0) {
(*entries)[image_id] = std::make_pair(mode, info);
}
#include "common/Throttle.h"
#include "cls/rbd/cls_rbd_client.h"
#include "osd/osd_types.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/api/Config.h"
return 0;
}
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(io_ctx);
C_SaferCond ctx;
- auto req = image::ValidatePoolRequest<I>::create(io_ctx, op_work_queue, &ctx);
+ auto req = image::ValidatePoolRequest<I>::create(
+ io_ctx, asio_engine.get_work_queue(), &ctx);
req->send();
return ctx.wait();
#include "common/errno.h"
#include "common/Cond.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
ldout(cct, 10) << dendl;
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(io_ctx);
+
C_SaferCond ctx;
auto req = mirror::EnableRequest<I>::create(
io_ctx, image_id, cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "", false,
- op_work_queue, &ctx);
+ asio_engine.get_work_queue(), &ctx);
req->send();
r = ctx.wait();
if (r < 0) {
return -EBUSY;
}
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(io_ctx);
C_SaferCond cond;
auto req = librbd::trash::RemoveRequest<I>::create(
- io_ctx, image_id, op_work_queue, force, prog_ctx, &cond);
+ io_ctx, image_id, asio_engine.get_work_queue(), force, prog_ctx, &cond);
req->send();
r = cond.wait();
#include "cls/journal/cls_journal_types.h"
#include "cls/journal/cls_journal_client.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
lderr(cct) << "Forced V1 image creation. " << dendl;
r = create_v1(io_ctx, image_name.c_str(), size, order);
} else {
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(io_ctx);
ConfigProxy config{cct->_conf};
api::Config<>::apply_pool_overrides(io_ctx, &config);
image::CreateRequest<> *req = image::CreateRequest<>::create(
config, io_ctx, image_name, id, size, opts, create_flags,
static_cast<cls::rbd::MirrorImageMode>(mirror_image_mode),
- non_primary_global_image_id, primary_mirror_uuid, op_work_queue, &cond);
+ non_primary_global_image_id, primary_mirror_uuid,
+ asio_engine.get_work_queue(), &cond);
req->send();
r = cond.wait();
ConfigProxy config{reinterpret_cast<CephContext *>(c_ioctx.cct())->_conf};
api::Config<>::apply_pool_overrides(c_ioctx, &config);
- asio::ContextWQ *op_work_queue;
- ImageCtx::get_work_queue(cct, &op_work_queue);
+ AsioEngine asio_engine(p_ioctx);
C_SaferCond cond;
auto *req = image::CloneRequest<>::create(
config, p_ioctx, parent_id, p_snap_name,
{cls::rbd::UserSnapshotNamespace{}}, CEPH_NOSNAP, c_ioctx, c_name,
clone_id, c_opts, cls::rbd::MIRROR_IMAGE_MODE_JOURNAL,
- non_primary_global_image_id, primary_mirror_uuid, op_work_queue, &cond);
+ non_primary_global_image_id, primary_mirror_uuid,
+ asio_engine.get_work_queue(), &cond);
req->send();
r = cond.wait();
${unittest_librbd_srcs}
$<TARGET_OBJECTS:common_texttable_obj>)
target_compile_definitions(unittest_librbd PRIVATE "TEST_LIBRBD_INTERNALS")
-target_link_libraries(unittest_librbd
- cls_rbd
- cls_rbd_client
+add_dependencies(unittest_librbd
+ cls_journal
cls_lock
- cls_lock_client
+ cls_rbd)
+target_link_libraries(unittest_librbd
+ rbd_test
+ rbd_api
+ rbd_internal
+ rbd_test_mock
journal
journal_test_mock
- cls_journal
+ cls_rbd_client
+ cls_lock_client
cls_journal_client
+ rbd_types
rados_test_stub
librados
- rbd_test
- rbd_test_mock
- rbd_api
- rbd_internal
- rbd_types
ceph_immutable_object_cache_lib
osdc
ceph-common
journal
cls_journal_client
cls_rbd_client
+ libneorados
librados
${UNITTEST_LIBS}
radostest)
#include "test/librbd/test_mock_fixture.h"
#include "include/rbd/librbd.hpp"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/internal.h"
librbd::ImageCtx *m_src_image_ctx;
librbd::ImageCtx *m_dst_image_ctx;
+
+ std::shared_ptr<librbd::AsioEngine> m_asio_engine;
asio::ContextWQ *m_work_queue;
+
librbd::SnapSeqs m_snap_seqs;
SnapMap m_snap_map;
ASSERT_EQ(0, create_image_pp(rbd, m_ioctx, dst_image_name, m_image_size));
ASSERT_EQ(0, open_image(dst_image_name, &m_dst_image_ctx));
- librbd::ImageCtx::get_work_queue(m_src_image_ctx->cct, &m_work_queue);
+ m_asio_engine = std::make_shared<librbd::AsioEngine>(
+ m_src_image_ctx->md_ctx);
+ m_work_queue = m_asio_engine->get_work_queue();
}
void expect_get_image_size(librbd::MockTestImageCtx &mock_image_ctx,
#include "test/librbd/test_mock_fixture.h"
#include "include/rbd/librbd.hpp"
#include "include/stringify.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/deep_copy/MetadataCopyRequest.h"
#include "librbd/image/GetMetadataRequest.h"
librbd::ImageCtx *m_src_image_ctx;
librbd::ImageCtx *m_dst_image_ctx;
+
+ std::shared_ptr<librbd::AsioEngine> m_asio_engine;
asio::ContextWQ *m_work_queue;
void SetUp() override {
ASSERT_EQ(0, create_image_pp(rbd, m_ioctx, dst_image_name, m_image_size));
ASSERT_EQ(0, open_image(dst_image_name, &m_dst_image_ctx));
- librbd::ImageCtx::get_work_queue(m_src_image_ctx->cct, &m_work_queue);
+ m_asio_engine = std::make_shared<librbd::AsioEngine>(
+ m_src_image_ctx->md_ctx);
+ m_work_queue = m_asio_engine->get_work_queue();
}
void expect_get_metadata(MockGetMetadataRequest& mock_request,
#include "include/interval_set.h"
#include "include/rbd/librbd.hpp"
#include "include/rbd/object_map_types.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/internal.h"
librbd::ImageCtx *m_src_image_ctx;
librbd::ImageCtx *m_dst_image_ctx;
+
+ std::shared_ptr<librbd::AsioEngine> m_asio_engine;
asio::ContextWQ *m_work_queue;
SnapMap m_snap_map;
ASSERT_EQ(0, create_image_pp(rbd, m_ioctx, dst_image_name, m_image_size));
ASSERT_EQ(0, open_image(dst_image_name, &m_dst_image_ctx));
- librbd::ImageCtx::get_work_queue(m_src_image_ctx->cct, &m_work_queue);
+ m_asio_engine = std::make_shared<librbd::AsioEngine>(
+ m_src_image_ctx->md_ctx);
+ m_work_queue = m_asio_engine->get_work_queue();
}
bool is_fast_diff(librbd::MockImageCtx &mock_image_ctx) {
#include "test/librbd/test_mock_fixture.h"
#include "test/librados_test_stub/LibradosTestStub.h"
#include "include/rbd/librbd.hpp"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "osdc/Striper.h"
typedef image::DetachParentRequest<MockTestImageCtx> MockDetachParentRequest;
librbd::ImageCtx *m_image_ctx;
+
+ std::shared_ptr<librbd::AsioEngine> m_asio_engine;
asio::ContextWQ *m_work_queue;
void SetUp() override {
ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
- librbd::ImageCtx::get_work_queue(m_image_ctx->cct, &m_work_queue);
+ m_asio_engine = std::make_shared<librbd::AsioEngine>(
+ m_image_ctx->md_ctx);
+ m_work_queue = m_asio_engine->get_work_queue();
}
void expect_start_op(librbd::MockExclusiveLock &mock_exclusive_lock) {
#include "test/librbd/test_mock_fixture.h"
#include "include/rbd/librbd.hpp"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/Operations.h"
librbd::ImageCtx *m_src_image_ctx;
librbd::ImageCtx *m_dst_image_ctx;
+
+ std::shared_ptr<librbd::AsioEngine> m_asio_engine;
asio::ContextWQ *m_work_queue;
librbd::SnapSeqs m_snap_seqs;
ASSERT_EQ(0, create_image_pp(rbd, m_ioctx, dst_image_name, m_image_size));
ASSERT_EQ(0, open_image(dst_image_name, &m_dst_image_ctx));
- librbd::ImageCtx::get_work_queue(m_src_image_ctx->cct, &m_work_queue);
+ m_asio_engine = std::make_shared<librbd::AsioEngine>(
+ m_src_image_ctx->md_ctx);
+ m_work_queue = m_asio_engine->get_work_queue();
}
void prepare_exclusive_lock(librbd::MockImageCtx &mock_image_ctx,
#include "test/librbd/test_mock_fixture.h"
#include "test/librados_test_stub/LibradosTestStub.h"
#include "include/rbd/librbd.hpp"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/Operations.h"
typedef SnapshotCreateRequest<librbd::MockTestImageCtx> MockSnapshotCreateRequest;
librbd::ImageCtx *m_image_ctx;
+
+ std::shared_ptr<librbd::AsioEngine> m_asio_engine;
asio::ContextWQ *m_work_queue;
void SetUp() override {
ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
- librbd::ImageCtx::get_work_queue(m_image_ctx->cct, &m_work_queue);
+ m_asio_engine = std::make_shared<librbd::AsioEngine>(
+ m_image_ctx->md_ctx);
+ m_work_queue = m_asio_engine->get_work_queue();
}
void expect_start_op(librbd::MockExclusiveLock &mock_exclusive_lock) {
lockers(image_ctx.lockers),
exclusive_locked(image_ctx.exclusive_locked),
lock_tag(image_ctx.lock_tag),
+ asio_engine(image_ctx.asio_engine),
owner_lock(image_ctx.owner_lock),
image_lock(image_ctx.image_lock),
timestamp_lock(image_ctx.timestamp_lock),
bool exclusive_locked;
std::string lock_tag;
+ std::shared_ptr<AsioEngine> asio_engine;
+
librados::IoCtx md_ctx;
librados::IoCtx data_ctx;
#include "test/librbd/test_mock_fixture.h"
#include "include/rbd/librbd.hpp"
+#include "librbd/AsioEngine.h"
#include "librbd/DeepCopyRequest.h"
#include "librbd/ImageState.h"
#include "librbd/Operations.h"
librbd::ImageCtx *m_src_image_ctx;
librbd::ImageCtx *m_dst_image_ctx;
+
+ std::shared_ptr<librbd::AsioEngine> m_asio_engine;
librbd::asio::ContextWQ *m_work_queue;
void SetUp() override {
ASSERT_EQ(0, open_image(m_image_name, &m_dst_image_ctx));
- librbd::ImageCtx::get_work_queue(m_src_image_ctx->cct, &m_work_queue);
+ m_asio_engine = std::make_shared<librbd::AsioEngine>(
+ m_src_image_ctx->md_ctx);
+ m_work_queue = m_asio_engine->get_work_queue();
}
void TearDown() override {
cls_rbd)
target_link_libraries(unittest_rbd_mirror
rbd_mirror_test
- rados_test_stub
rbd_mirror_internal
rbd_mirror_types
rbd_api
cls_lock_client
cls_journal_client
rbd_types
+ rados_test_stub
librados
osdc
global
cls_rbd_client
cls_journal_client
rbd_types
+ libneorados
librados
radostest-cxx
${UNITTEST_LIBS}
m_global_image_id = get_global_image_id(m_remote_ioctx, m_remote_image_id);
auto cct = reinterpret_cast<CephContext*>(m_local_ioctx.cct());
- m_threads.reset(new Threads<>(cct));
+ m_threads.reset(new Threads<>(m_local_cluster));
m_image_sync_throttler.reset(new Throttler<>(
cct, "rbd_mirror_concurrent_image_syncs"));
ASSERT_EQ(0, _rados->ioctx_create(_remote_pool_name.c_str(), m_remote_io_ctx));
m_image_name = get_temp_image_name();
- m_threads = new rbd::mirror::Threads<>(reinterpret_cast<CephContext*>(
- m_local_io_ctx.cct()));
+ m_threads = new rbd::mirror::Threads<>(_rados);
}
void TestFixture::TearDown() {
rbd_internal
rbd_types
journal
+ libneorados
librados
osdc
cls_rbd_client
m_local(new librados::Rados()),
m_cache_manager_handler(new CacheManagerHandler(cct)),
m_pool_meta_cache(new PoolMetaCache(cct)),
- m_asok_hook(new MirrorAdminSocketHook(cct, this))
-{
- m_threads =
- &(cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx>>(
- "rbd_mirror::threads", false, cct));
- m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
+ m_asok_hook(new MirrorAdminSocketHook(cct, this)) {
}
Mirror::~Mirror()
return r;
}
+ m_threads = &(m_cct->lookup_or_create_singleton_object<
+ Threads<librbd::ImageCtx>>("rbd_mirror::threads", false, m_local));
+ m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
+
r = m_service_daemon->init();
if (r < 0) {
derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
namespace mirror {
template <typename I>
-Threads<I>::Threads(CephContext *cct) {
- asio_engine = new librbd::AsioEngine(cct);
+Threads<I>::Threads(std::shared_ptr<librados::Rados>& rados) {
+ auto cct = static_cast<CephContext*>(rados->cct());
+ asio_engine = new librbd::AsioEngine(rados);
work_queue = asio_engine->get_work_queue();
timer = new SafeTimer(cct, timer_lock, true);
#define CEPH_RBD_MIRROR_THREADS_H
#include "include/common_fwd.h"
+#include "include/rados/librados_fwd.hpp"
#include "common/ceph_mutex.h"
+#include <memory>
class SafeTimer;
class ThreadPool;
SafeTimer *timer = nullptr;
ceph::mutex timer_lock = ceph::make_mutex("Threads::timer_lock");
- explicit Threads(CephContext *cct);
+ explicit Threads(std::shared_ptr<librados::Rados>& rados);
Threads(const Threads&) = delete;
Threads& operator=(const Threads&) = delete;