This will facilitate create mock test cases.
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
unittest_rbd_mirror_SOURCES = \
test/rbd_mirror/test_main.cc \
test/rbd_mirror/test_mock_fixture.cc \
+ test/rbd_mirror/test_mock_ImageReplayer.cc \
test/rbd_mirror/test_mock_ImageSync.cc \
test/rbd_mirror/image_sync/test_mock_ImageCopyRequest.cc \
test/rbd_mirror/image_sync/test_mock_ObjectCopyRequest.cc \
object_prefix(image_ctx.object_prefix),
header_oid(image_ctx.header_oid),
id(image_ctx.id),
+ name(image_ctx.name),
parent_md(image_ctx.parent_md),
layout(image_ctx.layout),
aio_work_queue(new MockAioImageRequestWQ()),
std::string object_prefix;
std::string header_oid;
std::string id;
+ std::string name;
parent_info parent_md;
file_layout_t layout;
struct MockImageState {
MOCK_CONST_METHOD0(is_refresh_required, bool());
MOCK_METHOD1(refresh, void(Context*));
+
+ MOCK_METHOD0(close, int());
+ MOCK_METHOD1(close, void(Context*));
};
} // namespace librbd
#undef dout_prefix
#define dout_prefix *_dout << "rbd-mirror-image-replay: "
-rbd::mirror::ImageReplayer *replayer = nullptr;
+rbd::mirror::ImageReplayer<> *replayer = nullptr;
void usage() {
std::cout << "usage: ceph_test_rbd_mirror_image_replay [options...] \\" << std::endl;
<< local_pool_name << ", remote_pool_name=" << remote_pool_name
<< ", image_name=" << image_name << dendl;
- rbd::mirror::ImageReplayer::BootstrapParams bootstap_params(local_pool_name,
- image_name);
+ rbd::mirror::ImageReplayer<>::BootstrapParams bootstap_params(local_pool_name,
+ image_name);
int64_t local_pool_id;
int64_t remote_pool_id;
std::string remote_image_id;
threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
local->cct()));
- replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id,
- local_pool_id, remote_pool_id,
- remote_image_id);
+ replayer = new rbd::mirror::ImageReplayer<>(threads, local, remote, client_id,
+ local_pool_id, remote_pool_id,
+ remote_image_id);
replayer->start(&start_cond, &bootstap_params);
r = start_cond.wait();
namespace journal {
+MockReplayEntry *MockReplayEntry::s_instance = nullptr;
MockJournaler *MockJournaler::s_instance = nullptr;
+std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &) {
+ return os;
+}
+
} // namespace journal
#define TEST_RBD_MIRROR_MOCK_JOURNALER_H
#include <gmock/gmock.h>
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+#include "cls/journal/cls_journal_types.h"
#include "librbd/Journal.h"
#include "librbd/journal/TypeTraits.h"
+#include <iosfwd>
+#include <string>
+
+class Context;
+class ContextWQ;
+class Mutex;
+class SafeTimer;
namespace journal {
+struct ReplayHandler;
+
+struct MockReplayEntry {
+ static MockReplayEntry *s_instance;
+ static MockReplayEntry &get_instance() {
+ assert(s_instance != nullptr);
+ return *s_instance;
+ }
+
+ MockReplayEntry() {
+ s_instance = this;
+ }
+
+ MOCK_CONST_METHOD0(get_commit_tid, uint64_t());
+ MOCK_METHOD0(get_data, bufferlist());
+};
+
+struct MockReplayEntryProxy {
+ uint64_t get_commit_tid() const {
+ return MockReplayEntry::get_instance().get_commit_tid();
+ }
+
+ bufferlist get_data() {
+ return MockReplayEntry::get_instance().get_data();
+ }
+};
+
struct MockJournaler {
static MockJournaler *s_instance;
static MockJournaler &get_instance() {
s_instance = this;
}
+ MOCK_METHOD1(init, void(Context *));
+ MOCK_METHOD0(shut_down, void());
+ MOCK_CONST_METHOD0(is_initialized, bool());
+
+ MOCK_METHOD4(get_mutable_metadata, void(uint64_t*, uint64_t*,
+ std::set<cls::journal::Client> *,
+ Context*));
+
+ MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *));
+ MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double));
+ MOCK_METHOD0(stop_replay, void());
+
+ MOCK_METHOD1(committed, void(const MockReplayEntryProxy &));
+ MOCK_METHOD1(flush_commit_position, void(Context*));
+
MOCK_METHOD2(update_client, void(const bufferlist&, Context *on_safe));
};
+struct MockJournalerProxy {
+ MockJournalerProxy(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ librados::IoCtx &header_ioctx, const std::string &journal_id,
+ const std::string &client_id, double commit_interval) {
+ MockJournaler::get_instance();
+ }
+
+ void init(Context *on_finish) {
+ MockJournaler::get_instance().init(on_finish);
+ }
+ void shut_down() {
+ MockJournaler::get_instance().shut_down();
+ }
+ bool is_initialized() const {
+ return MockJournaler::get_instance().is_initialized();
+ }
+
+ void get_mutable_metadata(uint64_t *min, uint64_t *active,
+ std::set<cls::journal::Client> *clients,
+ Context *on_finish) {
+ MockJournaler::get_instance().get_mutable_metadata(min, active, clients,
+ on_finish);
+ }
+
+ bool try_pop_front(MockReplayEntryProxy *entry) {
+ return MockJournaler::get_instance().try_pop_front(entry);
+ }
+ void start_live_replay(ReplayHandler *handler, double interval) {
+ MockJournaler::get_instance().start_live_replay(handler, interval);
+ }
+ void stop_replay() {
+ MockJournaler::get_instance().stop_replay();
+ }
+
+ void committed(const MockReplayEntryProxy &entry) {
+ MockJournaler::get_instance().committed(entry);
+ }
+ void flush_commit_position(Context *on_finish) {
+ MockJournaler::get_instance().flush_commit_position(on_finish);
+ }
+
+ void update_client(const bufferlist& data, Context *on_safe) {
+ MockJournaler::get_instance().update_client(data, on_safe);
+ }
+};
+
+std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &);
+
} // namespace journal
namespace librbd {
EXPECT_EQ(0, m_local_cluster.pool_delete(m_local_pool_name.c_str()));
}
- template <typename ImageReplayerT = rbd::mirror::ImageReplayer>
+ template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
void create_replayer() {
m_replayer = new ImageReplayerT(m_threads,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
m_client_id, m_local_ioctx.get_id(), m_remote_pool_id, m_remote_image_id);
}
- void start(rbd::mirror::ImageReplayer::BootstrapParams *bootstap_params =
+ void start(rbd::mirror::ImageReplayer<>::BootstrapParams *bootstap_params =
nullptr)
{
C_SaferCond cond;
{
create_replayer<>();
- rbd::mirror::ImageReplayer::BootstrapParams
+ rbd::mirror::ImageReplayer<>::BootstrapParams
bootstap_params(m_local_pool_name, m_image_name);
start(&bootstap_params);
wait_for_replay_complete();
std::string m_image_name;
int64_t m_remote_pool_id;
std::string m_remote_image_id;
- rbd::mirror::ImageReplayer *m_replayer;
+ rbd::mirror::ImageReplayer<> *m_replayer;
C_WatchCtx *m_watch_ctx;
uint64_t m_watch_handle;
char m_test_data[TEST_IO_SIZE + 1];
{
create_replayer<>();
- rbd::mirror::ImageReplayer::BootstrapParams
+ rbd::mirror::ImageReplayer<>::BootstrapParams
bootstap_params("INVALID_LOCAL_POOL_NAME", m_image_name);
C_SaferCond cond;
m_replayer->start(&cond, &bootstap_params);
false, 0, &order, 0, 0));
create_replayer<>();
- rbd::mirror::ImageReplayer::BootstrapParams
+ rbd::mirror::ImageReplayer<>::BootstrapParams
bootstap_params(m_local_pool_name, m_image_name);
C_SaferCond cond;
m_replayer->start(&cond, &bootstap_params);
close_image(ictx);
create_replayer<>();
- rbd::mirror::ImageReplayer::BootstrapParams
+ rbd::mirror::ImageReplayer<>::BootstrapParams
bootstap_params(m_local_pool_name, m_image_name);
C_SaferCond cond;
m_replayer->start(&cond, &bootstap_params);
TEST_F(TestImageReplayer, StartInterrupted)
{
create_replayer<>();
- rbd::mirror::ImageReplayer::BootstrapParams
+ rbd::mirror::ImageReplayer<>::BootstrapParams
bootstap_params(m_local_pool_name, m_image_name);
C_SaferCond start_cond, stop_cond;
m_replayer->start(&start_cond, &bootstap_params);
ASSERT_EQ(0, librbd::update_features(ictx, RBD_FEATURE_JOURNALING, false));
close_image(ictx);
- rbd::mirror::ImageReplayer::BootstrapParams
+ rbd::mirror::ImageReplayer<>::BootstrapParams
bootstap_params(m_local_pool_name, m_image_name);
C_SaferCond cond;
m_replayer->start(&cond, &bootstap_params);
stop();
}
-class ImageReplayer : public rbd::mirror::ImageReplayer {
+class ImageReplayer : public rbd::mirror::ImageReplayer<> {
public:
ImageReplayer(rbd::mirror::Threads *threads,
rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote,
const std::string &client_id, int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id)
- : rbd::mirror::ImageReplayer(threads, local, remote, client_id,
- local_pool_id, remote_pool_id, remote_image_id)
+ : rbd::mirror::ImageReplayer<>(threads, local, remote, client_id,
+ local_pool_id, remote_pool_id,
+ remote_image_id)
{}
void set_error(const std::string &state, int r) {
virtual void on_start_get_registered_client_status_finish(int r,
const std::set<cls::journal::Client> ®istered_clients,
const BootstrapParams &bootstrap_params) {
- rbd::mirror::ImageReplayer::on_start_get_registered_client_status_finish(
+ rbd::mirror::ImageReplayer<>::on_start_get_registered_client_status_finish(
get_error("on_start_get_registered_client_status"), registered_clients,
bootstrap_params);
}
virtual void on_start_remote_journaler_init_finish(int r) {
ASSERT_EQ(0, r);
- rbd::mirror::ImageReplayer::on_start_remote_journaler_init_finish(
+ rbd::mirror::ImageReplayer<>::on_start_remote_journaler_init_finish(
get_error("on_start_remote_journaler_init"));
}
virtual void on_start_local_image_open_finish(int r) {
int test_r = get_error("on_start_local_image_open");
if (!test_r) {
- rbd::mirror::ImageReplayer::on_start_local_image_open_finish(r);
+ rbd::mirror::ImageReplayer<>::on_start_local_image_open_finish(r);
return;
}
virtual void on_start_wait_for_local_journal_ready_finish(int r) {
ASSERT_EQ(0, r);
- rbd::mirror::ImageReplayer::on_start_wait_for_local_journal_ready_finish(
+ rbd::mirror::ImageReplayer<>::on_start_wait_for_local_journal_ready_finish(
get_error("on_start_wait_for_local_journal_ready"));
}
virtual void on_stop_journal_replay_shut_down_finish(int r) {
ASSERT_EQ(0, r);
- rbd::mirror::ImageReplayer::on_stop_journal_replay_shut_down_finish(
+ rbd::mirror::ImageReplayer<>::on_stop_journal_replay_shut_down_finish(
get_error("on_stop_journal_replay_shut_down"));
}
virtual void on_stop_local_image_close_finish(int r) {
ASSERT_EQ(0, r);
- rbd::mirror::ImageReplayer::on_stop_local_image_close_finish(
+ rbd::mirror::ImageReplayer<>::on_stop_local_image_close_finish(
get_error("on_stop_local_image_close"));
}
create_replayer<ImageReplayer>(); \
reinterpret_cast<ImageReplayer *>(m_replayer)-> \
set_error("on_start_" #state, -1); \
- rbd::mirror::ImageReplayer::BootstrapParams \
+ rbd::mirror::ImageReplayer<>::BootstrapParams \
bootstap_params(m_local_pool_name, m_image_name); \
C_SaferCond cond; \
m_replayer->start(&cond, &bootstap_params); \
create_replayer<ImageReplayer>(); \
reinterpret_cast<ImageReplayer *>(m_replayer)-> \
set_error("on_stop_" #state, -1); \
- rbd::mirror::ImageReplayer::BootstrapParams \
+ rbd::mirror::ImageReplayer<>::BootstrapParams \
bootstap_params(m_local_pool_name, m_image_name); \
start(&bootstap_params); \
/* TODO: investigate: without wait below I observe: */ \
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "librbd/journal/Replay.h"
+#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librbd/mock/MockJournal.h"
+#include "test/rbd_mirror/mock/MockJournaler.h"
+
+namespace librbd {
+
+struct MockImageReplayerJournal;
+
+struct MockImageReplayerImageCtx : public MockImageCtx {
+ MockImageReplayerJournal *journal = nullptr;
+};
+
+struct MockImageReplayerJournal : public MockJournal {
+ MOCK_METHOD1(start_external_replay, int(journal::Replay<MockImageReplayerImageCtx> **));
+ MOCK_METHOD0(stop_external_replay, void());
+};
+
+namespace journal {
+
+template<>
+struct Replay<MockImageReplayerImageCtx> {
+ MOCK_METHOD3(process, void(bufferlist::iterator *, Context *, Context *));
+ MOCK_METHOD1(flush, void(Context*));
+ MOCK_METHOD2(shut_down, void(bool, Context*));
+};
+
+template <>
+struct TypeTraits<MockImageReplayerImageCtx> {
+ typedef ::journal::MockJournalerProxy Journaler;
+ typedef ::journal::MockReplayEntryProxy ReplayEntry;
+};
+
+struct MirrorPeerClientMeta;
+
+} // namespace journal
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+template<>
+struct BootstrapRequest<librbd::MockImageReplayerImageCtx> {
+ static BootstrapRequest* s_instance;
+ Context *on_finish = nullptr;
+
+ static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
+ librados::IoCtx &remote_io_ctx,
+ librbd::MockImageReplayerImageCtx **local_image_ctx,
+ const std::string &local_image_name,
+ const std::string &remote_image_id,
+ ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock,
+ const std::string &mirror_uuid,
+ ::journal::MockJournalerProxy *journaler,
+ librbd::journal::MirrorPeerClientMeta *client_meta,
+ Context *on_finish) {
+ assert(s_instance != nullptr);
+ s_instance->on_finish = on_finish;
+ return s_instance;
+ }
+
+ BootstrapRequest() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ MOCK_METHOD0(send, void());
+};
+
+template<>
+struct CloseImageRequest<librbd::MockImageReplayerImageCtx> {
+ static CloseImageRequest* s_instance;
+ Context *on_finish = nullptr;
+
+ static CloseImageRequest* create(librbd::MockImageReplayerImageCtx **image_ctx,
+ ContextWQ *work_queue,
+ Context *on_finish) {
+ assert(s_instance != nullptr);
+ s_instance->on_finish = on_finish;
+ return s_instance;
+ }
+
+ CloseImageRequest() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ MOCK_METHOD0(send, void());
+};
+
+template<>
+struct OpenLocalImageRequest<librbd::MockImageReplayerImageCtx> {
+ static OpenLocalImageRequest* s_instance;
+ Context *on_finish = nullptr;
+
+ static OpenLocalImageRequest* create(librados::IoCtx &local_io_ctx,
+ librbd::MockImageReplayerImageCtx **local_image_ctx,
+ const std::string &local_image_name,
+ const std::string &local_image_id,
+ ContextWQ *work_queue,
+ Context *on_finish) {
+ assert(s_instance != nullptr);
+ s_instance->on_finish = on_finish;
+ return s_instance;
+ }
+
+ OpenLocalImageRequest() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ MOCK_METHOD0(send, void());
+};
+
+BootstrapRequest<librbd::MockImageReplayerImageCtx>* BootstrapRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
+CloseImageRequest<librbd::MockImageReplayerImageCtx>* CloseImageRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
+OpenLocalImageRequest<librbd::MockImageReplayerImageCtx>* OpenLocalImageRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+// template definitions
+#include "tools/rbd_mirror/ImageReplayer.cc"
+template class rbd::mirror::ImageReplayer<librbd::MockImageReplayerImageCtx>;
+
+namespace rbd {
+namespace mirror {
+
+class TestMockImageReplayer : public TestMockFixture {
+public:
+ typedef ImageReplayer<librbd::MockImageReplayerImageCtx> MockImageReplayer;
+
+ virtual void SetUp() {
+ TestMockFixture::SetUp();
+
+ librbd::RBD rbd;
+ ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, m_image_size));
+ ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, &m_remote_image_ctx));
+ }
+
+ librbd::ImageCtx *m_remote_image_ctx;
+};
+
+TEST_F(TestMockImageReplayer, Blah) {
+}
+
+} // namespace mirror
+} // namespace rbd
#include "librbd/Journal.h"
#include "librbd/Operations.h"
#include "librbd/Utils.h"
-#include "librbd/internal.h"
#include "librbd/journal/Replay.h"
#include "ImageReplayer.h"
#include "ImageSync.h"
using librbd::util::create_context_callback;
using namespace rbd::mirror::image_replayer;
-std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state);
+template <typename I>
+std::ostream &operator<<(std::ostream &os,
+ const typename ImageReplayer<I>::State &state);
namespace {
+template <typename I>
struct ReplayHandler : public ::journal::ReplayHandler {
- ImageReplayer *replayer;
- ReplayHandler(ImageReplayer *replayer) : replayer(replayer) {}
+ ImageReplayer<I> *replayer;
+ ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
virtual void get() {}
virtual void put() {}
}
};
+template <typename I>
struct C_ReplayCommitted : public Context {
- ImageReplayer *replayer;
- ::journal::ReplayEntry replay_entry;
+ typedef typename librbd::journal::TypeTraits<I>::ReplayEntry ReplayEntry;
- C_ReplayCommitted(ImageReplayer *replayer, ::journal::ReplayEntry &&replay_entry) :
- replayer(replayer), replay_entry(std::move(replay_entry)) {
+ ImageReplayer<I> *replayer;
+ ReplayEntry replay_entry;
+
+ C_ReplayCommitted(ImageReplayer<I> *replayer,
+ ReplayEntry &&replay_entry)
+ : replayer(replayer), replay_entry(std::move(replay_entry)) {
}
virtual void finish(int r) {
replayer->handle_replay_committed(&replay_entry, r);
virtual bool call(Formatter *f, stringstream *ss) = 0;
};
+template <typename I>
class StatusCommand : public ImageReplayerAdminSocketCommand {
public:
- explicit StatusCommand(ImageReplayer *replayer) : replayer(replayer) {}
+ explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
bool call(Formatter *f, stringstream *ss) {
replayer->print_status(f, ss);
}
private:
- ImageReplayer *replayer;
+ ImageReplayer<I> *replayer;
};
+template <typename I>
class FlushCommand : public ImageReplayerAdminSocketCommand {
public:
- explicit FlushCommand(ImageReplayer *replayer) : replayer(replayer) {}
+ explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
bool call(Formatter *f, stringstream *ss) {
C_SaferCond cond;
}
private:
- ImageReplayer *replayer;
+ ImageReplayer<I> *replayer;
};
-} // anonymous namespace
-
+template <typename I>
class ImageReplayerAdminSocketHook : public AdminSocketHook {
public:
ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
- ImageReplayer *replayer) :
+ ImageReplayer<I> *replayer) :
admin_socket(cct->get_admin_socket()) {
std::string command;
int r;
r = admin_socket->register_command(command, command, this,
"get status for rbd mirror " + name);
if (r == 0) {
- commands[command] = new StatusCommand(replayer);
+ commands[command] = new StatusCommand<I>(replayer);
}
command = "rbd mirror flush " + name;
r = admin_socket->register_command(command, command, this,
"flush rbd mirror " + name);
if (r == 0) {
- commands[command] = new FlushCommand(replayer);
+ commands[command] = new FlushCommand<I>(replayer);
}
}
Commands commands;
};
-ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
+} // anonymous namespace
+
+template <typename I>
+ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &client_id,
int64_t local_pool_id,
int64_t remote_pool_id,
{
}
-ImageReplayer::~ImageReplayer()
+template <typename I>
+ImageReplayer<I>::~ImageReplayer()
{
assert(m_local_image_ctx == nullptr);
assert(m_local_replay == nullptr);
delete m_asok_hook;
}
-void ImageReplayer::start(Context *on_finish,
+template <typename I>
+void ImageReplayer<I>::start(Context *on_finish,
const BootstrapParams *bootstrap_params)
{
dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
CephContext *cct = static_cast<CephContext *>(m_local->cct());
double commit_interval = cct->_conf->rbd_journal_commit_age;
- m_remote_journaler = new ::journal::Journaler(m_threads->work_queue,
- m_threads->timer,
- &m_threads->timer_lock,
- m_remote_ioctx,
- m_remote_image_id, m_client_id,
- commit_interval);
+ m_remote_journaler = new Journaler(m_threads->work_queue,
+ m_threads->timer,
+ &m_threads->timer_lock, m_remote_ioctx,
+ m_remote_image_id, m_client_id,
+ commit_interval);
on_start_get_registered_client_status_start(bootstrap_params);
}
-void ImageReplayer::on_start_get_registered_client_status_start(
+template <typename I>
+void ImageReplayer<I>::on_start_get_registered_client_status_start(
const BootstrapParams *bootstrap_params)
{
dout(20) << "enter" << dendl;
&m->registered_clients, ctx);
}
-void ImageReplayer::on_start_get_registered_client_status_finish(int r,
+template <typename I>
+void ImageReplayer<I>::on_start_get_registered_client_status_finish(int r,
const std::set<cls::journal::Client> ®istered_clients,
const BootstrapParams &bootstrap_params)
{
bootstrap(bootstrap_params);
}
-void ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params) {
+template <typename I>
+void ImageReplayer<I>::bootstrap(const BootstrapParams &bootstrap_params) {
int r;
BootstrapParams params;
// TODO: add a new bootstrap state and support canceling
Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer::handle_bootstrap>(this);
- BootstrapRequest<> *request = BootstrapRequest<>::create(
+ ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
+ BootstrapRequest<I> *request = BootstrapRequest<I>::create(
m_local_ioctx, m_remote_ioctx, &m_local_image_ctx,
params.local_image_name, m_remote_image_id, m_threads->work_queue,
m_threads->timer, &m_threads->timer_lock, m_client_id, m_remote_journaler,
request->send();
}
-void ImageReplayer::handle_bootstrap(int r) {
+template <typename I>
+void ImageReplayer<I>::handle_bootstrap(int r) {
dout(20) << "r=" << r << dendl;
if (r < 0) {
on_start_remote_journaler_init_start();
}
-void ImageReplayer::on_start_remote_journaler_init_start()
+template <typename I>
+void ImageReplayer<I>::on_start_remote_journaler_init_start()
{
if (on_start_interrupted()) {
return;
m_remote_journaler->init(ctx);
}
-void ImageReplayer::on_start_remote_journaler_init_finish(int r)
+template <typename I>
+void ImageReplayer<I>::on_start_remote_journaler_init_finish(int r)
{
dout(20) << "r=" << r << dendl;
on_start_local_image_open_start();
}
-void ImageReplayer::on_start_local_image_open_start()
+template <typename I>
+void ImageReplayer<I>::on_start_local_image_open_start()
{
dout(20) << "enter" << dendl;
if (m_local_image_ctx != nullptr) {
// open and lock the local image
Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer::on_start_local_image_open_finish>(this);
- OpenLocalImageRequest<> *request = OpenLocalImageRequest<>::create(
+ ImageReplayer, &ImageReplayer<I>::on_start_local_image_open_finish>(this);
+ OpenLocalImageRequest<I> *request = OpenLocalImageRequest<I>::create(
m_local_ioctx, &m_local_image_ctx, "", m_local_image_id,
m_threads->work_queue, ctx);
request->send();
}
-void ImageReplayer::on_start_local_image_open_finish(int r)
+template <typename I>
+void ImageReplayer<I>::on_start_local_image_open_finish(int r)
{
dout(20) << "r=" << r << dendl;
on_start_wait_for_local_journal_ready_start();
}
-void ImageReplayer::on_start_wait_for_local_journal_ready_start()
+template <typename I>
+void ImageReplayer<I>::on_start_wait_for_local_journal_ready_start()
{
dout(20) << "enter" << dendl;
CephContext *cct = static_cast<CephContext *>(m_local->cct());
- m_asok_hook = new ImageReplayerAdminSocketHook(cct, m_name, this);
+ m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
}
FunctionContext *ctx = new FunctionContext(
m_local_image_ctx->journal->wait_for_journal_ready(ctx);
}
-void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r)
+template <typename I>
+void ImageReplayer<I>::on_start_wait_for_local_journal_ready_finish(int r)
{
dout(20) << "r=" << r << dendl;
return;
}
- m_replay_handler = new ReplayHandler(this);
+ m_replay_handler = new ReplayHandler<I>(this);
m_remote_journaler->start_live_replay(m_replay_handler,
1 /* TODO: configurable */);
}
}
-void ImageReplayer::on_start_fail_start(int r)
+template <typename I>
+void ImageReplayer<I>::on_start_fail_start(int r)
{
dout(20) << "r=" << r << dendl;
m_threads->work_queue->queue(ctx, 0);
}
-void ImageReplayer::on_start_fail_finish(int r)
+template <typename I>
+void ImageReplayer<I>::on_start_fail_finish(int r)
{
dout(20) << "r=" << r << dendl;
}
if (m_local_image_ctx) {
- bool owner;
- if (librbd::is_exclusive_lock_owner(m_local_image_ctx, &owner) == 0 &&
- owner) {
- librbd::unlock(m_local_image_ctx, "");
- }
+ // TODO: switch to async close via CloseImageRequest
m_local_image_ctx->state->close();
m_local_image_ctx = nullptr;
}
}
}
-bool ImageReplayer::on_start_interrupted()
+template <typename I>
+bool ImageReplayer<I>::on_start_interrupted()
{
Mutex::Locker locker(m_lock);
return true;
}
-void ImageReplayer::stop(Context *on_finish)
+template <typename I>
+void ImageReplayer<I>::stop(Context *on_finish)
{
dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
<< dendl;
m_state = STATE_STOPPING;
}
-void ImageReplayer::on_stop_journal_replay_shut_down_start()
+template <typename I>
+void ImageReplayer<I>::on_stop_journal_replay_shut_down_start()
{
dout(20) << "enter" << dendl;
m_local_replay->shut_down(false, ctx);
}
-void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r)
+template <typename I>
+void ImageReplayer<I>::on_stop_journal_replay_shut_down_finish(int r)
{
dout(20) << "r=" << r << dendl;
on_stop_local_image_close_start();
}
-void ImageReplayer::on_stop_local_image_close_start()
+template <typename I>
+void ImageReplayer<I>::on_stop_local_image_close_start()
{
dout(20) << "enter" << dendl;
// close and delete the image (from outside the image's thread context)
Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer::on_stop_local_image_close_finish>(this);
- CloseImageRequest<> *request = CloseImageRequest<>::create(
+ ImageReplayer, &ImageReplayer<I>::on_stop_local_image_close_finish>(this);
+ CloseImageRequest<I> *request = CloseImageRequest<I>::create(
&m_local_image_ctx, m_threads->work_queue, ctx);
request->send();
}
-void ImageReplayer::on_stop_local_image_close_finish(int r)
+template <typename I>
+void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
{
dout(20) << "r=" << r << dendl;
}
}
-void ImageReplayer::close_local_image(Context *on_finish)
+template <typename I>
+void ImageReplayer<I>::close_local_image(Context *on_finish)
{
m_local_image_ctx->state->close(on_finish);
}
-void ImageReplayer::handle_replay_ready()
+template <typename I>
+void ImageReplayer<I>::handle_replay_ready()
{
dout(20) << "enter" << dendl;
- ::journal::ReplayEntry replay_entry;
+ ReplayEntry replay_entry;
if (!m_remote_journaler->try_pop_front(&replay_entry)) {
return;
}
bufferlist data = replay_entry.get_data();
bufferlist::iterator it = data.begin();
Context *on_ready = create_context_callback<
- ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this);
- Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry));
+ ImageReplayer, &ImageReplayer<I>::handle_replay_process_ready>(this);
+ Context *on_commit = new C_ReplayCommitted<I>(this, std::move(replay_entry));
m_local_replay->process(&it, on_ready, on_commit);
}
-void ImageReplayer::flush(Context *on_finish)
+template <typename I>
+void ImageReplayer<I>::flush(Context *on_finish)
{
dout(20) << "enter" << dendl;
}
}
-void ImageReplayer::on_flush_local_replay_flush_start()
+template <typename I>
+void ImageReplayer<I>::on_flush_local_replay_flush_start()
{
dout(20) << "enter" << dendl;
m_local_replay->flush(ctx);
}
-void ImageReplayer::on_flush_local_replay_flush_finish(int r)
+template <typename I>
+void ImageReplayer<I>::on_flush_local_replay_flush_finish(int r)
{
dout(20) << "r=" << r << dendl;
on_flush_flush_commit_position_start(r);
}
-void ImageReplayer::on_flush_flush_commit_position_start(int last_r)
+template <typename I>
+void ImageReplayer<I>::on_flush_flush_commit_position_start(int last_r)
{
FunctionContext *ctx = new FunctionContext(
m_remote_journaler->flush_commit_position(ctx);
}
-void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r)
+template <typename I>
+void ImageReplayer<I>::on_flush_flush_commit_position_finish(int last_r, int r)
{
if (r < 0) {
derr << "error flushing remote journal commit position: "
}
}
-bool ImageReplayer::on_flush_interrupted()
+template <typename I>
+bool ImageReplayer<I>::on_flush_interrupted()
{
Context *on_finish(nullptr);
return true;
}
-void ImageReplayer::print_status(Formatter *f, stringstream *ss)
+template <typename I>
+void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
{
dout(20) << "enter" << dendl;
}
}
-void ImageReplayer::handle_replay_process_ready(int r)
+template <typename I>
+void ImageReplayer<I>::handle_replay_process_ready(int r)
{
// journal::Replay is ready for more events -- attempt to pop another
handle_replay_ready();
}
-void ImageReplayer::handle_replay_complete(int r)
+template <typename I>
+void ImageReplayer<I>::handle_replay_complete(int r)
{
dout(20) "r=" << r << dendl;
//m_remote_journaler->stop_replay();
}
-void ImageReplayer::handle_replay_committed(
- ::journal::ReplayEntry *replay_entry, int r)
+template <typename I>
+void ImageReplayer<I>::handle_replay_committed(ReplayEntry *replay_entry, int r)
{
dout(20) << "commit_tid=" << replay_entry->get_commit_tid() << ", r=" << r
<< dendl;
m_remote_journaler->committed(*replay_entry);
}
-int ImageReplayer::get_bootstrap_params(BootstrapParams *params)
+template <typename I>
+int ImageReplayer<I>::get_bootstrap_params(BootstrapParams *params)
{
int r = librbd::cls_client::dir_get_name(&m_remote_ioctx, RBD_DIRECTORY,
m_remote_image_id,
return 0;
}
-void ImageReplayer::shut_down_journal_replay(bool cancel_ops)
+template <typename I>
+void ImageReplayer<I>::shut_down_journal_replay(bool cancel_ops)
{
C_SaferCond cond;
m_local_replay->shut_down(cancel_ops, &cond);
}
}
-std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state)
+template <typename I>
+std::ostream &operator<<(std::ostream &os,
+ const typename ImageReplayer<I>::State &state)
{
switch (state) {
- case ImageReplayer::STATE_UNINITIALIZED:
+ case ImageReplayer<I>::STATE_UNINITIALIZED:
os << "Uninitialized";
break;
- case ImageReplayer::STATE_STARTING:
+ case ImageReplayer<I>::STATE_STARTING:
os << "Starting";
break;
- case ImageReplayer::STATE_REPLAYING:
+ case ImageReplayer<I>::STATE_REPLAYING:
os << "Replaying";
break;
- case ImageReplayer::STATE_FLUSHING_REPLAY:
+ case ImageReplayer<I>::STATE_FLUSHING_REPLAY:
os << "FlushingReplay";
break;
- case ImageReplayer::STATE_STOPPING:
+ case ImageReplayer<I>::STATE_STOPPING:
os << "Stopping";
break;
- case ImageReplayer::STATE_STOPPED:
+ case ImageReplayer<I>::STATE_STOPPED:
os << "Stopped";
break;
default:
return os;
}
-std::ostream &operator<<(std::ostream &os, const ImageReplayer &replayer)
+template <typename I>
+std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
{
- os << "ImageReplayer[" << replayer.m_remote_pool_id << "/"
- << replayer.m_remote_image_id << "]";
+ os << "ImageReplayer[" << replayer.get_remote_pool_id() << "/"
+ << replayer.get_remote_image_id() << "]";
return os;
}
} // namespace mirror
} // namespace rbd
+
+template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
#include "types.h"
+class AdminSocketHook;
+
namespace journal {
class Journaler;
class ReplayHandler;
-class ReplayEntry;
}
namespace librbd {
class ImageCtx;
-
-namespace journal {
-
-template <typename> class Replay;
-
-}
+namespace journal { template <typename> class Replay; }
}
namespace rbd {
namespace mirror {
-class ImageReplayerAdminSocketHook;
struct Threads;
/**
* Replays changes from a remote cluster for a single image.
*/
+template <typename ImageCtxT = librbd::ImageCtx>
class ImageReplayer {
public:
+ typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
+
enum State {
STATE_UNINITIALIZED,
STATE_STARTING,
virtual void handle_replay_process_ready(int r);
virtual void handle_replay_complete(int r);
- virtual void handle_replay_committed(::journal::ReplayEntry* replay_entry, int r);
+ virtual void handle_replay_committed(ReplayEntry* replay_entry, int r);
+ inline int64_t get_remote_pool_id() const {
+ return m_remote_pool_id;
+ }
+ inline const std::string get_remote_image_id() const {
+ return m_remote_image_id;
+ }
protected:
/**
* @verbatim
void close_local_image(Context *on_finish); // for tests
private:
+ typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
State get_state_() const { return m_state; }
bool is_stopped_() const { return m_state == STATE_UNINITIALIZED ||
m_state == STATE_STOPPED; }
void shut_down_journal_replay(bool cancel_ops);
- friend std::ostream &operator<<(std::ostream &os,
- const ImageReplayer &replayer);
-
Threads *m_threads;
RadosRef m_local, m_remote;
std::string m_client_id;
State m_state;
std::string m_local_pool_name, m_remote_pool_name;
librados::IoCtx m_local_ioctx, m_remote_ioctx;
- librbd::ImageCtx *m_local_image_ctx;
- librbd::journal::Replay<librbd::ImageCtx> *m_local_replay;
- ::journal::Journaler *m_remote_journaler;
+ ImageCtxT *m_local_image_ctx;
+ librbd::journal::Replay<ImageCtxT> *m_local_replay;
+ Journaler* m_remote_journaler;
::journal::ReplayHandler *m_replay_handler;
Context *m_on_finish;
- ImageReplayerAdminSocketHook *m_asok_hook;
+ AdminSocketHook *m_asok_hook;
librbd::journal::MirrorPeerClientMeta m_client_meta;
};
} // namespace mirror
} // namespace rbd
+extern template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
+
#endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
for (const auto &image_id : kv.second) {
auto it = pool_replayers.find(image_id);
if (it == pool_replayers.end()) {
- unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_threads,
- m_local,
- m_remote,
- mirror_uuid,
- local_ioctx.get_id(),
- pool_id,
- image_id));
+ unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
+ m_threads, m_local, m_remote, mirror_uuid, local_ioctx.get_id(),
+ pool_id, image_id));
it = pool_replayers.insert(
std::make_pair(image_id, std::move(image_replayer))).first;
}
}
}
-void Replayer::start_image_replayer(unique_ptr<ImageReplayer> &image_replayer)
+void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
{
if (!image_replayer->is_stopped()) {
return;
image_replayer->start();
}
-bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer> &image_replayer)
+bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
{
if (image_replayer->is_stopped()) {
return true;
private:
void set_sources(const std::map<int64_t, std::set<std::string> > &images);
- void start_image_replayer(unique_ptr<ImageReplayer> &image_replayer);
- bool stop_image_replayer(unique_ptr<ImageReplayer> &image_replayer);
+ void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
+ bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
Threads *m_threads;
Mutex m_lock;
// index by pool so it's easy to tell what is affected
// when a pool's configuration changes
std::map<int64_t, std::map<std::string,
- std::unique_ptr<ImageReplayer> > > m_images;
+ std::unique_ptr<ImageReplayer<> > > > m_images;
ReplayerAdminSocketHook *m_asok_hook;
class ReplayerThread : public Thread {