From 418b4746a1745439f47a9ae48cff89fd413b8202 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 22 Mar 2016 11:28:53 -0400 Subject: [PATCH] rbd-mirror: convert ImageReplayer into templated class This will facilitate create mock test cases. Signed-off-by: Jason Dillaman --- src/test/Makefile-client.am | 1 + src/test/librbd/mock/MockImageCtx.h | 2 + src/test/librbd/mock/MockImageState.h | 3 + src/test/rbd_mirror/image_replay.cc | 12 +- src/test/rbd_mirror/mock/MockJournaler.cc | 5 + src/test/rbd_mirror/mock/MockJournaler.h | 100 ++++++++ src/test/rbd_mirror/test_ImageReplayer.cc | 41 ++-- .../rbd_mirror/test_mock_ImageReplayer.cc | 159 +++++++++++++ src/tools/rbd_mirror/ImageReplayer.cc | 213 +++++++++++------- src/tools/rbd_mirror/ImageReplayer.h | 38 ++-- src/tools/rbd_mirror/Replayer.cc | 14 +- src/tools/rbd_mirror/Replayer.h | 6 +- 12 files changed, 455 insertions(+), 139 deletions(-) create mode 100644 src/test/rbd_mirror/test_mock_ImageReplayer.cc diff --git a/src/test/Makefile-client.am b/src/test/Makefile-client.am index 82d57a4802c0a..90d1875eeb691 100644 --- a/src/test/Makefile-client.am +++ b/src/test/Makefile-client.am @@ -458,6 +458,7 @@ noinst_LTLIBRARIES += librbd_mirror_test.la unittest_rbd_mirror_SOURCES = \ test/rbd_mirror/test_main.cc \ test/rbd_mirror/test_mock_fixture.cc \ + test/rbd_mirror/test_mock_ImageReplayer.cc \ test/rbd_mirror/test_mock_ImageSync.cc \ test/rbd_mirror/image_sync/test_mock_ImageCopyRequest.cc \ test/rbd_mirror/image_sync/test_mock_ObjectCopyRequest.cc \ diff --git a/src/test/librbd/mock/MockImageCtx.h b/src/test/librbd/mock/MockImageCtx.h index e48618c483ebd..06168965c60bb 100644 --- a/src/test/librbd/mock/MockImageCtx.h +++ b/src/test/librbd/mock/MockImageCtx.h @@ -55,6 +55,7 @@ struct MockImageCtx { object_prefix(image_ctx.object_prefix), header_oid(image_ctx.header_oid), id(image_ctx.id), + name(image_ctx.name), parent_md(image_ctx.parent_md), layout(image_ctx.layout), aio_work_queue(new MockAioImageRequestWQ()), @@ -190,6 +191,7 @@ struct MockImageCtx { std::string object_prefix; std::string header_oid; std::string id; + std::string name; parent_info parent_md; file_layout_t layout; diff --git a/src/test/librbd/mock/MockImageState.h b/src/test/librbd/mock/MockImageState.h index 6f75ecd100aa7..8f5f206fb6222 100644 --- a/src/test/librbd/mock/MockImageState.h +++ b/src/test/librbd/mock/MockImageState.h @@ -13,6 +13,9 @@ namespace librbd { struct MockImageState { MOCK_CONST_METHOD0(is_refresh_required, bool()); MOCK_METHOD1(refresh, void(Context*)); + + MOCK_METHOD0(close, int()); + MOCK_METHOD1(close, void(Context*)); }; } // namespace librbd diff --git a/src/test/rbd_mirror/image_replay.cc b/src/test/rbd_mirror/image_replay.cc index 87d449717a96e..3abe40cb7a57b 100644 --- a/src/test/rbd_mirror/image_replay.cc +++ b/src/test/rbd_mirror/image_replay.cc @@ -19,7 +19,7 @@ #undef dout_prefix #define dout_prefix *_dout << "rbd-mirror-image-replay: " -rbd::mirror::ImageReplayer *replayer = nullptr; +rbd::mirror::ImageReplayer<> *replayer = nullptr; void usage() { std::cout << "usage: ceph_test_rbd_mirror_image_replay [options...] \\" << std::endl; @@ -103,8 +103,8 @@ int main(int argc, const char **argv) << local_pool_name << ", remote_pool_name=" << remote_pool_name << ", image_name=" << image_name << dendl; - rbd::mirror::ImageReplayer::BootstrapParams bootstap_params(local_pool_name, - image_name); + rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params(local_pool_name, + image_name); int64_t local_pool_id; int64_t remote_pool_id; std::string remote_image_id; @@ -185,9 +185,9 @@ int main(int argc, const char **argv) threads = new rbd::mirror::Threads(reinterpret_cast( local->cct())); - replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id, - local_pool_id, remote_pool_id, - remote_image_id); + replayer = new rbd::mirror::ImageReplayer<>(threads, local, remote, client_id, + local_pool_id, remote_pool_id, + remote_image_id); replayer->start(&start_cond, &bootstap_params); r = start_cond.wait(); diff --git a/src/test/rbd_mirror/mock/MockJournaler.cc b/src/test/rbd_mirror/mock/MockJournaler.cc index 43f23e824a1cb..047dd2f410f2d 100644 --- a/src/test/rbd_mirror/mock/MockJournaler.cc +++ b/src/test/rbd_mirror/mock/MockJournaler.cc @@ -5,6 +5,11 @@ namespace journal { +MockReplayEntry *MockReplayEntry::s_instance = nullptr; MockJournaler *MockJournaler::s_instance = nullptr; +std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &) { + return os; +} + } // namespace journal diff --git a/src/test/rbd_mirror/mock/MockJournaler.h b/src/test/rbd_mirror/mock/MockJournaler.h index 3b09d9a362b5d..7279d26586dfa 100644 --- a/src/test/rbd_mirror/mock/MockJournaler.h +++ b/src/test/rbd_mirror/mock/MockJournaler.h @@ -5,11 +5,48 @@ #define TEST_RBD_MIRROR_MOCK_JOURNALER_H #include +#include "include/int_types.h" +#include "include/rados/librados.hpp" +#include "cls/journal/cls_journal_types.h" #include "librbd/Journal.h" #include "librbd/journal/TypeTraits.h" +#include +#include + +class Context; +class ContextWQ; +class Mutex; +class SafeTimer; namespace journal { +struct ReplayHandler; + +struct MockReplayEntry { + static MockReplayEntry *s_instance; + static MockReplayEntry &get_instance() { + assert(s_instance != nullptr); + return *s_instance; + } + + MockReplayEntry() { + s_instance = this; + } + + MOCK_CONST_METHOD0(get_commit_tid, uint64_t()); + MOCK_METHOD0(get_data, bufferlist()); +}; + +struct MockReplayEntryProxy { + uint64_t get_commit_tid() const { + return MockReplayEntry::get_instance().get_commit_tid(); + } + + bufferlist get_data() { + return MockReplayEntry::get_instance().get_data(); + } +}; + struct MockJournaler { static MockJournaler *s_instance; static MockJournaler &get_instance() { @@ -21,9 +58,72 @@ struct MockJournaler { s_instance = this; } + MOCK_METHOD1(init, void(Context *)); + MOCK_METHOD0(shut_down, void()); + MOCK_CONST_METHOD0(is_initialized, bool()); + + MOCK_METHOD4(get_mutable_metadata, void(uint64_t*, uint64_t*, + std::set *, + Context*)); + + MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *)); + MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double)); + MOCK_METHOD0(stop_replay, void()); + + MOCK_METHOD1(committed, void(const MockReplayEntryProxy &)); + MOCK_METHOD1(flush_commit_position, void(Context*)); + MOCK_METHOD2(update_client, void(const bufferlist&, Context *on_safe)); }; +struct MockJournalerProxy { + MockJournalerProxy(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock, + librados::IoCtx &header_ioctx, const std::string &journal_id, + const std::string &client_id, double commit_interval) { + MockJournaler::get_instance(); + } + + void init(Context *on_finish) { + MockJournaler::get_instance().init(on_finish); + } + void shut_down() { + MockJournaler::get_instance().shut_down(); + } + bool is_initialized() const { + return MockJournaler::get_instance().is_initialized(); + } + + void get_mutable_metadata(uint64_t *min, uint64_t *active, + std::set *clients, + Context *on_finish) { + MockJournaler::get_instance().get_mutable_metadata(min, active, clients, + on_finish); + } + + bool try_pop_front(MockReplayEntryProxy *entry) { + return MockJournaler::get_instance().try_pop_front(entry); + } + void start_live_replay(ReplayHandler *handler, double interval) { + MockJournaler::get_instance().start_live_replay(handler, interval); + } + void stop_replay() { + MockJournaler::get_instance().stop_replay(); + } + + void committed(const MockReplayEntryProxy &entry) { + MockJournaler::get_instance().committed(entry); + } + void flush_commit_position(Context *on_finish) { + MockJournaler::get_instance().flush_commit_position(on_finish); + } + + void update_client(const bufferlist& data, Context *on_safe) { + MockJournaler::get_instance().update_client(data, on_safe); + } +}; + +std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &); + } // namespace journal namespace librbd { diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index d31d9317af209..35c4c8e3f13d3 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -111,7 +111,7 @@ public: EXPECT_EQ(0, m_local_cluster.pool_delete(m_local_pool_name.c_str())); } - template + template > void create_replayer() { m_replayer = new ImageReplayerT(m_threads, rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), @@ -119,7 +119,7 @@ public: m_client_id, m_local_ioctx.get_id(), m_remote_pool_id, m_remote_image_id); } - void start(rbd::mirror::ImageReplayer::BootstrapParams *bootstap_params = + void start(rbd::mirror::ImageReplayer<>::BootstrapParams *bootstap_params = nullptr) { C_SaferCond cond; @@ -150,7 +150,7 @@ public: { create_replayer<>(); - rbd::mirror::ImageReplayer::BootstrapParams + rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params(m_local_pool_name, m_image_name); start(&bootstap_params); wait_for_replay_complete(); @@ -329,7 +329,7 @@ public: std::string m_image_name; int64_t m_remote_pool_id; std::string m_remote_image_id; - rbd::mirror::ImageReplayer *m_replayer; + rbd::mirror::ImageReplayer<> *m_replayer; C_WatchCtx *m_watch_ctx; uint64_t m_watch_handle; char m_test_data[TEST_IO_SIZE + 1]; @@ -346,7 +346,7 @@ TEST_F(TestImageReplayer, BootstrapErrorInvalidPool) { create_replayer<>(); - rbd::mirror::ImageReplayer::BootstrapParams + rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params("INVALID_LOCAL_POOL_NAME", m_image_name); C_SaferCond cond; m_replayer->start(&cond, &bootstap_params); @@ -360,7 +360,7 @@ TEST_F(TestImageReplayer, BootstrapErrorLocalImageExists) false, 0, &order, 0, 0)); create_replayer<>(); - rbd::mirror::ImageReplayer::BootstrapParams + rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params(m_local_pool_name, m_image_name); C_SaferCond cond; m_replayer->start(&cond, &bootstap_params); @@ -378,7 +378,7 @@ TEST_F(TestImageReplayer, BootstrapErrorNoJournal) close_image(ictx); create_replayer<>(); - rbd::mirror::ImageReplayer::BootstrapParams + rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params(m_local_pool_name, m_image_name); C_SaferCond cond; m_replayer->start(&cond, &bootstap_params); @@ -388,7 +388,7 @@ TEST_F(TestImageReplayer, BootstrapErrorNoJournal) TEST_F(TestImageReplayer, StartInterrupted) { create_replayer<>(); - rbd::mirror::ImageReplayer::BootstrapParams + rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params(m_local_pool_name, m_image_name); C_SaferCond start_cond, stop_cond; m_replayer->start(&start_cond, &bootstap_params); @@ -425,7 +425,7 @@ TEST_F(TestImageReplayer, ErrorNoJournal) ASSERT_EQ(0, librbd::update_features(ictx, RBD_FEATURE_JOURNALING, false)); close_image(ictx); - rbd::mirror::ImageReplayer::BootstrapParams + rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params(m_local_pool_name, m_image_name); C_SaferCond cond; m_replayer->start(&cond, &bootstap_params); @@ -537,14 +537,15 @@ TEST_F(TestImageReplayer, NextTag) stop(); } -class ImageReplayer : public rbd::mirror::ImageReplayer { +class ImageReplayer : public rbd::mirror::ImageReplayer<> { public: ImageReplayer(rbd::mirror::Threads *threads, rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote, const std::string &client_id, int64_t local_pool_id, int64_t remote_pool_id, const std::string &remote_image_id) - : rbd::mirror::ImageReplayer(threads, local, remote, client_id, - local_pool_id, remote_pool_id, remote_image_id) + : rbd::mirror::ImageReplayer<>(threads, local, remote, client_id, + local_pool_id, remote_pool_id, + remote_image_id) {} void set_error(const std::string &state, int r) { @@ -560,21 +561,21 @@ protected: virtual void on_start_get_registered_client_status_finish(int r, const std::set ®istered_clients, const BootstrapParams &bootstrap_params) { - rbd::mirror::ImageReplayer::on_start_get_registered_client_status_finish( + rbd::mirror::ImageReplayer<>::on_start_get_registered_client_status_finish( get_error("on_start_get_registered_client_status"), registered_clients, bootstrap_params); } virtual void on_start_remote_journaler_init_finish(int r) { ASSERT_EQ(0, r); - rbd::mirror::ImageReplayer::on_start_remote_journaler_init_finish( + rbd::mirror::ImageReplayer<>::on_start_remote_journaler_init_finish( get_error("on_start_remote_journaler_init")); } virtual void on_start_local_image_open_finish(int r) { int test_r = get_error("on_start_local_image_open"); if (!test_r) { - rbd::mirror::ImageReplayer::on_start_local_image_open_finish(r); + rbd::mirror::ImageReplayer<>::on_start_local_image_open_finish(r); return; } @@ -591,19 +592,19 @@ protected: virtual void on_start_wait_for_local_journal_ready_finish(int r) { ASSERT_EQ(0, r); - rbd::mirror::ImageReplayer::on_start_wait_for_local_journal_ready_finish( + rbd::mirror::ImageReplayer<>::on_start_wait_for_local_journal_ready_finish( get_error("on_start_wait_for_local_journal_ready")); } virtual void on_stop_journal_replay_shut_down_finish(int r) { ASSERT_EQ(0, r); - rbd::mirror::ImageReplayer::on_stop_journal_replay_shut_down_finish( + rbd::mirror::ImageReplayer<>::on_stop_journal_replay_shut_down_finish( get_error("on_stop_journal_replay_shut_down")); } virtual void on_stop_local_image_close_finish(int r) { ASSERT_EQ(0, r); - rbd::mirror::ImageReplayer::on_stop_local_image_close_finish( + rbd::mirror::ImageReplayer<>::on_stop_local_image_close_finish( get_error("on_stop_local_image_close")); } @@ -617,7 +618,7 @@ TEST_F(TestImageReplayer, Error_on_start_##state) \ create_replayer(); \ reinterpret_cast(m_replayer)-> \ set_error("on_start_" #state, -1); \ - rbd::mirror::ImageReplayer::BootstrapParams \ + rbd::mirror::ImageReplayer<>::BootstrapParams \ bootstap_params(m_local_pool_name, m_image_name); \ C_SaferCond cond; \ m_replayer->start(&cond, &bootstap_params); \ @@ -630,7 +631,7 @@ TEST_F(TestImageReplayer, Error_on_stop_##state) \ create_replayer(); \ reinterpret_cast(m_replayer)-> \ set_error("on_stop_" #state, -1); \ - rbd::mirror::ImageReplayer::BootstrapParams \ + rbd::mirror::ImageReplayer<>::BootstrapParams \ bootstap_params(m_local_pool_name, m_image_name); \ start(&bootstap_params); \ /* TODO: investigate: without wait below I observe: */ \ diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc new file mode 100644 index 0000000000000..36ad1733e8af8 --- /dev/null +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -0,0 +1,159 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/rbd_mirror/test_mock_fixture.h" +#include "librbd/journal/Replay.h" +#include "tools/rbd_mirror/ImageReplayer.h" +#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" +#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/librbd/mock/MockJournal.h" +#include "test/rbd_mirror/mock/MockJournaler.h" + +namespace librbd { + +struct MockImageReplayerJournal; + +struct MockImageReplayerImageCtx : public MockImageCtx { + MockImageReplayerJournal *journal = nullptr; +}; + +struct MockImageReplayerJournal : public MockJournal { + MOCK_METHOD1(start_external_replay, int(journal::Replay **)); + MOCK_METHOD0(stop_external_replay, void()); +}; + +namespace journal { + +template<> +struct Replay { + MOCK_METHOD3(process, void(bufferlist::iterator *, Context *, Context *)); + MOCK_METHOD1(flush, void(Context*)); + MOCK_METHOD2(shut_down, void(bool, Context*)); +}; + +template <> +struct TypeTraits { + typedef ::journal::MockJournalerProxy Journaler; + typedef ::journal::MockReplayEntryProxy ReplayEntry; +}; + +struct MirrorPeerClientMeta; + +} // namespace journal +} // namespace librbd + +namespace rbd { +namespace mirror { +namespace image_replayer { + +template<> +struct BootstrapRequest { + static BootstrapRequest* s_instance; + Context *on_finish = nullptr; + + static BootstrapRequest* create(librados::IoCtx &local_io_ctx, + librados::IoCtx &remote_io_ctx, + librbd::MockImageReplayerImageCtx **local_image_ctx, + const std::string &local_image_name, + const std::string &remote_image_id, + ContextWQ *work_queue, SafeTimer *timer, + Mutex *timer_lock, + const std::string &mirror_uuid, + ::journal::MockJournalerProxy *journaler, + librbd::journal::MirrorPeerClientMeta *client_meta, + Context *on_finish) { + assert(s_instance != nullptr); + s_instance->on_finish = on_finish; + return s_instance; + } + + BootstrapRequest() { + assert(s_instance == nullptr); + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +template<> +struct CloseImageRequest { + static CloseImageRequest* s_instance; + Context *on_finish = nullptr; + + static CloseImageRequest* create(librbd::MockImageReplayerImageCtx **image_ctx, + ContextWQ *work_queue, + Context *on_finish) { + assert(s_instance != nullptr); + s_instance->on_finish = on_finish; + return s_instance; + } + + CloseImageRequest() { + assert(s_instance == nullptr); + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +template<> +struct OpenLocalImageRequest { + static OpenLocalImageRequest* s_instance; + Context *on_finish = nullptr; + + static OpenLocalImageRequest* create(librados::IoCtx &local_io_ctx, + librbd::MockImageReplayerImageCtx **local_image_ctx, + const std::string &local_image_name, + const std::string &local_image_id, + ContextWQ *work_queue, + Context *on_finish) { + assert(s_instance != nullptr); + s_instance->on_finish = on_finish; + return s_instance; + } + + OpenLocalImageRequest() { + assert(s_instance == nullptr); + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +BootstrapRequest* BootstrapRequest::s_instance = nullptr; +CloseImageRequest* CloseImageRequest::s_instance = nullptr; +OpenLocalImageRequest* OpenLocalImageRequest::s_instance = nullptr; + +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +// template definitions +#include "tools/rbd_mirror/ImageReplayer.cc" +template class rbd::mirror::ImageReplayer; + +namespace rbd { +namespace mirror { + +class TestMockImageReplayer : public TestMockFixture { +public: + typedef ImageReplayer MockImageReplayer; + + virtual void SetUp() { + TestMockFixture::SetUp(); + + librbd::RBD rbd; + ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, m_image_size)); + ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, &m_remote_image_ctx)); + } + + librbd::ImageCtx *m_remote_image_ctx; +}; + +TEST_F(TestMockImageReplayer, Blah) { +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 11a7b22ab0473..fa87a0d67dfb9 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -17,7 +17,6 @@ #include "librbd/Journal.h" #include "librbd/Operations.h" #include "librbd/Utils.h" -#include "librbd/internal.h" #include "librbd/journal/Replay.h" #include "ImageReplayer.h" #include "ImageSync.h" @@ -41,13 +40,16 @@ namespace mirror { using librbd::util::create_context_callback; using namespace rbd::mirror::image_replayer; -std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state); +template +std::ostream &operator<<(std::ostream &os, + const typename ImageReplayer::State &state); namespace { +template struct ReplayHandler : public ::journal::ReplayHandler { - ImageReplayer *replayer; - ReplayHandler(ImageReplayer *replayer) : replayer(replayer) {} + ImageReplayer *replayer; + ReplayHandler(ImageReplayer *replayer) : replayer(replayer) {} virtual void get() {} virtual void put() {} @@ -60,12 +62,16 @@ struct ReplayHandler : public ::journal::ReplayHandler { } }; +template struct C_ReplayCommitted : public Context { - ImageReplayer *replayer; - ::journal::ReplayEntry replay_entry; + typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; - C_ReplayCommitted(ImageReplayer *replayer, ::journal::ReplayEntry &&replay_entry) : - replayer(replayer), replay_entry(std::move(replay_entry)) { + ImageReplayer *replayer; + ReplayEntry replay_entry; + + C_ReplayCommitted(ImageReplayer *replayer, + ReplayEntry &&replay_entry) + : replayer(replayer), replay_entry(std::move(replay_entry)) { } virtual void finish(int r) { replayer->handle_replay_committed(&replay_entry, r); @@ -78,9 +84,10 @@ public: virtual bool call(Formatter *f, stringstream *ss) = 0; }; +template class StatusCommand : public ImageReplayerAdminSocketCommand { public: - explicit StatusCommand(ImageReplayer *replayer) : replayer(replayer) {} + explicit StatusCommand(ImageReplayer *replayer) : replayer(replayer) {} bool call(Formatter *f, stringstream *ss) { replayer->print_status(f, ss); @@ -88,12 +95,13 @@ public: } private: - ImageReplayer *replayer; + ImageReplayer *replayer; }; +template class FlushCommand : public ImageReplayerAdminSocketCommand { public: - explicit FlushCommand(ImageReplayer *replayer) : replayer(replayer) {} + explicit FlushCommand(ImageReplayer *replayer) : replayer(replayer) {} bool call(Formatter *f, stringstream *ss) { C_SaferCond cond; @@ -107,15 +115,14 @@ public: } private: - ImageReplayer *replayer; + ImageReplayer *replayer; }; -} // anonymous namespace - +template class ImageReplayerAdminSocketHook : public AdminSocketHook { public: ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name, - ImageReplayer *replayer) : + ImageReplayer *replayer) : admin_socket(cct->get_admin_socket()) { std::string command; int r; @@ -124,14 +131,14 @@ public: r = admin_socket->register_command(command, command, this, "get status for rbd mirror " + name); if (r == 0) { - commands[command] = new StatusCommand(replayer); + commands[command] = new StatusCommand(replayer); } command = "rbd mirror flush " + name; r = admin_socket->register_command(command, command, this, "flush rbd mirror " + name); if (r == 0) { - commands[command] = new FlushCommand(replayer); + commands[command] = new FlushCommand(replayer); } } @@ -162,7 +169,10 @@ private: Commands commands; }; -ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, +} // anonymous namespace + +template +ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, const std::string &client_id, int64_t local_pool_id, int64_t remote_pool_id, @@ -187,7 +197,8 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, { } -ImageReplayer::~ImageReplayer() +template +ImageReplayer::~ImageReplayer() { assert(m_local_image_ctx == nullptr); assert(m_local_replay == nullptr); @@ -197,7 +208,8 @@ ImageReplayer::~ImageReplayer() delete m_asok_hook; } -void ImageReplayer::start(Context *on_finish, +template +void ImageReplayer::start(Context *on_finish, const BootstrapParams *bootstrap_params) { dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish @@ -243,17 +255,17 @@ void ImageReplayer::start(Context *on_finish, CephContext *cct = static_cast(m_local->cct()); double commit_interval = cct->_conf->rbd_journal_commit_age; - m_remote_journaler = new ::journal::Journaler(m_threads->work_queue, - m_threads->timer, - &m_threads->timer_lock, - m_remote_ioctx, - m_remote_image_id, m_client_id, - commit_interval); + m_remote_journaler = new Journaler(m_threads->work_queue, + m_threads->timer, + &m_threads->timer_lock, m_remote_ioctx, + m_remote_image_id, m_client_id, + commit_interval); on_start_get_registered_client_status_start(bootstrap_params); } -void ImageReplayer::on_start_get_registered_client_status_start( +template +void ImageReplayer::on_start_get_registered_client_status_start( const BootstrapParams *bootstrap_params) { dout(20) << "enter" << dendl; @@ -280,7 +292,8 @@ void ImageReplayer::on_start_get_registered_client_status_start( &m->registered_clients, ctx); } -void ImageReplayer::on_start_get_registered_client_status_finish(int r, +template +void ImageReplayer::on_start_get_registered_client_status_finish(int r, const std::set ®istered_clients, const BootstrapParams &bootstrap_params) { @@ -329,7 +342,8 @@ void ImageReplayer::on_start_get_registered_client_status_finish(int r, bootstrap(bootstrap_params); } -void ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params) { +template +void ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params) { int r; BootstrapParams params; @@ -352,8 +366,8 @@ void ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params) { // TODO: add a new bootstrap state and support canceling Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_bootstrap>(this); - BootstrapRequest<> *request = BootstrapRequest<>::create( + ImageReplayer, &ImageReplayer::handle_bootstrap>(this); + BootstrapRequest *request = BootstrapRequest::create( m_local_ioctx, m_remote_ioctx, &m_local_image_ctx, params.local_image_name, m_remote_image_id, m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, m_client_id, m_remote_journaler, @@ -361,7 +375,8 @@ void ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params) { request->send(); } -void ImageReplayer::handle_bootstrap(int r) { +template +void ImageReplayer::handle_bootstrap(int r) { dout(20) << "r=" << r << dendl; if (r < 0) { @@ -375,7 +390,8 @@ void ImageReplayer::handle_bootstrap(int r) { on_start_remote_journaler_init_start(); } -void ImageReplayer::on_start_remote_journaler_init_start() +template +void ImageReplayer::on_start_remote_journaler_init_start() { if (on_start_interrupted()) { return; @@ -391,7 +407,8 @@ void ImageReplayer::on_start_remote_journaler_init_start() m_remote_journaler->init(ctx); } -void ImageReplayer::on_start_remote_journaler_init_finish(int r) +template +void ImageReplayer::on_start_remote_journaler_init_finish(int r) { dout(20) << "r=" << r << dendl; @@ -408,7 +425,8 @@ void ImageReplayer::on_start_remote_journaler_init_finish(int r) on_start_local_image_open_start(); } -void ImageReplayer::on_start_local_image_open_start() +template +void ImageReplayer::on_start_local_image_open_start() { dout(20) << "enter" << dendl; if (m_local_image_ctx != nullptr) { @@ -419,14 +437,15 @@ void ImageReplayer::on_start_local_image_open_start() // open and lock the local image Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::on_start_local_image_open_finish>(this); - OpenLocalImageRequest<> *request = OpenLocalImageRequest<>::create( + ImageReplayer, &ImageReplayer::on_start_local_image_open_finish>(this); + OpenLocalImageRequest *request = OpenLocalImageRequest::create( m_local_ioctx, &m_local_image_ctx, "", m_local_image_id, m_threads->work_queue, ctx); request->send(); } -void ImageReplayer::on_start_local_image_open_finish(int r) +template +void ImageReplayer::on_start_local_image_open_finish(int r) { dout(20) << "r=" << r << dendl; @@ -443,7 +462,8 @@ void ImageReplayer::on_start_local_image_open_finish(int r) on_start_wait_for_local_journal_ready_start(); } -void ImageReplayer::on_start_wait_for_local_journal_ready_start() +template +void ImageReplayer::on_start_wait_for_local_journal_ready_start() { dout(20) << "enter" << dendl; @@ -454,7 +474,7 @@ void ImageReplayer::on_start_wait_for_local_journal_ready_start() CephContext *cct = static_cast(m_local->cct()); - m_asok_hook = new ImageReplayerAdminSocketHook(cct, m_name, this); + m_asok_hook = new ImageReplayerAdminSocketHook(cct, m_name, this); } FunctionContext *ctx = new FunctionContext( @@ -464,7 +484,8 @@ void ImageReplayer::on_start_wait_for_local_journal_ready_start() m_local_image_ctx->journal->wait_for_journal_ready(ctx); } -void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r) +template +void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r) { dout(20) << "r=" << r << dendl; @@ -486,7 +507,7 @@ void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r) return; } - m_replay_handler = new ReplayHandler(this); + m_replay_handler = new ReplayHandler(this); m_remote_journaler->start_live_replay(m_replay_handler, 1 /* TODO: configurable */); @@ -519,7 +540,8 @@ void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r) } } -void ImageReplayer::on_start_fail_start(int r) +template +void ImageReplayer::on_start_fail_start(int r) { dout(20) << "r=" << r << dendl; @@ -532,7 +554,8 @@ void ImageReplayer::on_start_fail_start(int r) m_threads->work_queue->queue(ctx, 0); } -void ImageReplayer::on_start_fail_finish(int r) +template +void ImageReplayer::on_start_fail_finish(int r) { dout(20) << "r=" << r << dendl; @@ -556,11 +579,7 @@ void ImageReplayer::on_start_fail_finish(int r) } if (m_local_image_ctx) { - bool owner; - if (librbd::is_exclusive_lock_owner(m_local_image_ctx, &owner) == 0 && - owner) { - librbd::unlock(m_local_image_ctx, ""); - } + // TODO: switch to async close via CloseImageRequest m_local_image_ctx->state->close(); m_local_image_ctx = nullptr; } @@ -590,7 +609,8 @@ void ImageReplayer::on_start_fail_finish(int r) } } -bool ImageReplayer::on_start_interrupted() +template +bool ImageReplayer::on_start_interrupted() { Mutex::Locker locker(m_lock); @@ -604,7 +624,8 @@ bool ImageReplayer::on_start_interrupted() return true; } -void ImageReplayer::stop(Context *on_finish) +template +void ImageReplayer::stop(Context *on_finish) { dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish << dendl; @@ -650,7 +671,8 @@ void ImageReplayer::stop(Context *on_finish) m_state = STATE_STOPPING; } -void ImageReplayer::on_stop_journal_replay_shut_down_start() +template +void ImageReplayer::on_stop_journal_replay_shut_down_start() { dout(20) << "enter" << dendl; @@ -662,7 +684,8 @@ void ImageReplayer::on_stop_journal_replay_shut_down_start() m_local_replay->shut_down(false, ctx); } -void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r) +template +void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r) { dout(20) << "r=" << r << dendl; @@ -676,19 +699,21 @@ void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r) on_stop_local_image_close_start(); } -void ImageReplayer::on_stop_local_image_close_start() +template +void ImageReplayer::on_stop_local_image_close_start() { dout(20) << "enter" << dendl; // close and delete the image (from outside the image's thread context) Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::on_stop_local_image_close_finish>(this); - CloseImageRequest<> *request = CloseImageRequest<>::create( + ImageReplayer, &ImageReplayer::on_stop_local_image_close_finish>(this); + CloseImageRequest *request = CloseImageRequest::create( &m_local_image_ctx, m_threads->work_queue, ctx); request->send(); } -void ImageReplayer::on_stop_local_image_close_finish(int r) +template +void ImageReplayer::on_stop_local_image_close_finish(int r) { dout(20) << "r=" << r << dendl; @@ -727,16 +752,18 @@ void ImageReplayer::on_stop_local_image_close_finish(int r) } } -void ImageReplayer::close_local_image(Context *on_finish) +template +void ImageReplayer::close_local_image(Context *on_finish) { m_local_image_ctx->state->close(on_finish); } -void ImageReplayer::handle_replay_ready() +template +void ImageReplayer::handle_replay_ready() { dout(20) << "enter" << dendl; - ::journal::ReplayEntry replay_entry; + ReplayEntry replay_entry; if (!m_remote_journaler->try_pop_front(&replay_entry)) { return; } @@ -746,12 +773,13 @@ void ImageReplayer::handle_replay_ready() bufferlist data = replay_entry.get_data(); bufferlist::iterator it = data.begin(); Context *on_ready = create_context_callback< - ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this); - Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry)); + ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this); + Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry)); m_local_replay->process(&it, on_ready, on_commit); } -void ImageReplayer::flush(Context *on_finish) +template +void ImageReplayer::flush(Context *on_finish) { dout(20) << "enter" << dendl; @@ -777,7 +805,8 @@ void ImageReplayer::flush(Context *on_finish) } } -void ImageReplayer::on_flush_local_replay_flush_start() +template +void ImageReplayer::on_flush_local_replay_flush_start() { dout(20) << "enter" << dendl; @@ -789,7 +818,8 @@ void ImageReplayer::on_flush_local_replay_flush_start() m_local_replay->flush(ctx); } -void ImageReplayer::on_flush_local_replay_flush_finish(int r) +template +void ImageReplayer::on_flush_local_replay_flush_finish(int r) { dout(20) << "r=" << r << dendl; @@ -804,7 +834,8 @@ void ImageReplayer::on_flush_local_replay_flush_finish(int r) on_flush_flush_commit_position_start(r); } -void ImageReplayer::on_flush_flush_commit_position_start(int last_r) +template +void ImageReplayer::on_flush_flush_commit_position_start(int last_r) { FunctionContext *ctx = new FunctionContext( @@ -815,7 +846,8 @@ void ImageReplayer::on_flush_flush_commit_position_start(int last_r) m_remote_journaler->flush_commit_position(ctx); } -void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r) +template +void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r) { if (r < 0) { derr << "error flushing remote journal commit position: " @@ -846,7 +878,8 @@ void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r) } } -bool ImageReplayer::on_flush_interrupted() +template +bool ImageReplayer::on_flush_interrupted() { Context *on_finish(nullptr); @@ -873,7 +906,8 @@ bool ImageReplayer::on_flush_interrupted() return true; } -void ImageReplayer::print_status(Formatter *f, stringstream *ss) +template +void ImageReplayer::print_status(Formatter *f, stringstream *ss) { dout(20) << "enter" << dendl; @@ -890,7 +924,8 @@ void ImageReplayer::print_status(Formatter *f, stringstream *ss) } } -void ImageReplayer::handle_replay_process_ready(int r) +template +void ImageReplayer::handle_replay_process_ready(int r) { // journal::Replay is ready for more events -- attempt to pop another @@ -906,15 +941,16 @@ void ImageReplayer::handle_replay_process_ready(int r) handle_replay_ready(); } -void ImageReplayer::handle_replay_complete(int r) +template +void ImageReplayer::handle_replay_complete(int r) { dout(20) "r=" << r << dendl; //m_remote_journaler->stop_replay(); } -void ImageReplayer::handle_replay_committed( - ::journal::ReplayEntry *replay_entry, int r) +template +void ImageReplayer::handle_replay_committed(ReplayEntry *replay_entry, int r) { dout(20) << "commit_tid=" << replay_entry->get_commit_tid() << ", r=" << r << dendl; @@ -922,7 +958,8 @@ void ImageReplayer::handle_replay_committed( m_remote_journaler->committed(*replay_entry); } -int ImageReplayer::get_bootstrap_params(BootstrapParams *params) +template +int ImageReplayer::get_bootstrap_params(BootstrapParams *params) { int r = librbd::cls_client::dir_get_name(&m_remote_ioctx, RBD_DIRECTORY, m_remote_image_id, @@ -938,7 +975,8 @@ int ImageReplayer::get_bootstrap_params(BootstrapParams *params) return 0; } -void ImageReplayer::shut_down_journal_replay(bool cancel_ops) +template +void ImageReplayer::shut_down_journal_replay(bool cancel_ops) { C_SaferCond cond; m_local_replay->shut_down(cancel_ops, &cond); @@ -948,25 +986,27 @@ void ImageReplayer::shut_down_journal_replay(bool cancel_ops) } } -std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state) +template +std::ostream &operator<<(std::ostream &os, + const typename ImageReplayer::State &state) { switch (state) { - case ImageReplayer::STATE_UNINITIALIZED: + case ImageReplayer::STATE_UNINITIALIZED: os << "Uninitialized"; break; - case ImageReplayer::STATE_STARTING: + case ImageReplayer::STATE_STARTING: os << "Starting"; break; - case ImageReplayer::STATE_REPLAYING: + case ImageReplayer::STATE_REPLAYING: os << "Replaying"; break; - case ImageReplayer::STATE_FLUSHING_REPLAY: + case ImageReplayer::STATE_FLUSHING_REPLAY: os << "FlushingReplay"; break; - case ImageReplayer::STATE_STOPPING: + case ImageReplayer::STATE_STOPPING: os << "Stopping"; break; - case ImageReplayer::STATE_STOPPED: + case ImageReplayer::STATE_STOPPED: os << "Stopped"; break; default: @@ -976,12 +1016,15 @@ std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state) return os; } -std::ostream &operator<<(std::ostream &os, const ImageReplayer &replayer) +template +std::ostream &operator<<(std::ostream &os, const ImageReplayer &replayer) { - os << "ImageReplayer[" << replayer.m_remote_pool_id << "/" - << replayer.m_remote_image_id << "]"; + os << "ImageReplayer[" << replayer.get_remote_pool_id() << "/" + << replayer.get_remote_image_id() << "]"; return os; } } // namespace mirror } // namespace rbd + +template class rbd::mirror::ImageReplayer; diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 9b6d03aa04339..261f570c5eb87 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -13,39 +13,38 @@ #include "include/rados/librados.hpp" #include "cls/journal/cls_journal_types.h" #include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" #include "types.h" +class AdminSocketHook; + namespace journal { class Journaler; class ReplayHandler; -class ReplayEntry; } namespace librbd { class ImageCtx; - -namespace journal { - -template class Replay; - -} +namespace journal { template class Replay; } } namespace rbd { namespace mirror { -class ImageReplayerAdminSocketHook; struct Threads; /** * Replays changes from a remote cluster for a single image. */ +template class ImageReplayer { public: + typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; + enum State { STATE_UNINITIALIZED, STATE_STARTING, @@ -94,8 +93,14 @@ public: virtual void handle_replay_process_ready(int r); virtual void handle_replay_complete(int r); - virtual void handle_replay_committed(::journal::ReplayEntry* replay_entry, int r); + virtual void handle_replay_committed(ReplayEntry* replay_entry, int r); + inline int64_t get_remote_pool_id() const { + return m_remote_pool_id; + } + inline const std::string get_remote_image_id() const { + return m_remote_image_id; + } protected: /** * @verbatim @@ -177,6 +182,8 @@ protected: void close_local_image(Context *on_finish); // for tests private: + typedef typename librbd::journal::TypeTraits::Journaler Journaler; + State get_state_() const { return m_state; } bool is_stopped_() const { return m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPED; } @@ -186,9 +193,6 @@ private: void shut_down_journal_replay(bool cancel_ops); - friend std::ostream &operator<<(std::ostream &os, - const ImageReplayer &replayer); - Threads *m_threads; RadosRef m_local, m_remote; std::string m_client_id; @@ -199,12 +203,12 @@ private: State m_state; std::string m_local_pool_name, m_remote_pool_name; librados::IoCtx m_local_ioctx, m_remote_ioctx; - librbd::ImageCtx *m_local_image_ctx; - librbd::journal::Replay *m_local_replay; - ::journal::Journaler *m_remote_journaler; + ImageCtxT *m_local_image_ctx; + librbd::journal::Replay *m_local_replay; + Journaler* m_remote_journaler; ::journal::ReplayHandler *m_replay_handler; Context *m_on_finish; - ImageReplayerAdminSocketHook *m_asok_hook; + AdminSocketHook *m_asok_hook; librbd::journal::MirrorPeerClientMeta m_client_meta; }; @@ -212,4 +216,6 @@ private: } // namespace mirror } // namespace rbd +extern template class rbd::mirror::ImageReplayer; + #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index b49ad45b5f72f..5aaf18da6ec47 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -327,13 +327,9 @@ void Replayer::set_sources(const map > &images) for (const auto &image_id : kv.second) { auto it = pool_replayers.find(image_id); if (it == pool_replayers.end()) { - unique_ptr image_replayer(new ImageReplayer(m_threads, - m_local, - m_remote, - mirror_uuid, - local_ioctx.get_id(), - pool_id, - image_id)); + unique_ptr > image_replayer(new ImageReplayer<>( + m_threads, m_local, m_remote, mirror_uuid, local_ioctx.get_id(), + pool_id, image_id)); it = pool_replayers.insert( std::make_pair(image_id, std::move(image_replayer))).first; } @@ -342,7 +338,7 @@ void Replayer::set_sources(const map > &images) } } -void Replayer::start_image_replayer(unique_ptr &image_replayer) +void Replayer::start_image_replayer(unique_ptr > &image_replayer) { if (!image_replayer->is_stopped()) { return; @@ -351,7 +347,7 @@ void Replayer::start_image_replayer(unique_ptr &image_replayer) image_replayer->start(); } -bool Replayer::stop_image_replayer(unique_ptr &image_replayer) +bool Replayer::stop_image_replayer(unique_ptr > &image_replayer) { if (image_replayer->is_stopped()) { return true; diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index e0509566a7977..d818f72983640 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -46,8 +46,8 @@ public: private: void set_sources(const std::map > &images); - void start_image_replayer(unique_ptr &image_replayer); - bool stop_image_replayer(unique_ptr &image_replayer); + void start_image_replayer(unique_ptr > &image_replayer); + bool stop_image_replayer(unique_ptr > &image_replayer); Threads *m_threads; Mutex m_lock; @@ -61,7 +61,7 @@ private: // index by pool so it's easy to tell what is affected // when a pool's configuration changes std::map > > m_images; + std::unique_ptr > > > m_images; ReplayerAdminSocketHook *m_asok_hook; class ReplayerThread : public Thread { -- 2.39.5