false, features, &order, 0, 0));
m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);
- m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+ m_threads = new rbd::mirror::Threads<>(reinterpret_cast<CephContext*>(
m_local_ioctx.cct()));
m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
static int _image_number;
- rbd::mirror::Threads *m_threads = nullptr;
+ rbd::mirror::Threads<> *m_threads = nullptr;
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<librados::Rados> m_local_cluster;
librados::Rados m_remote_cluster;
ASSERT_EQ(0, _rados->ioctx_create(_remote_pool_name.c_str(), m_remote_io_ctx));
m_image_name = get_temp_image_name();
- m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+ m_threads = new rbd::mirror::Threads<>(reinterpret_cast<CephContext*>(
m_local_io_ctx.cct()));
}
namespace rbd {
namespace mirror {
-class Threads;
+template <typename> class Threads;
class TestFixture : public ::testing::Test {
public:
std::set<librbd::ImageCtx *> m_image_ctxs;
- Threads *m_threads = nullptr;
+ Threads<librbd::ImageCtx> *m_threads = nullptr;
int create_image(librbd::RBD &rbd, librados::IoCtx &ioctx,
namespace rbd {
namespace mirror {
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+ Mutex &timer_lock;
+ SafeTimer *timer;
+ ContextWQ *work_queue;
+
+ Threads(Threads<librbd::ImageCtx> *threads)
+ : timer_lock(threads->timer_lock), timer(threads->timer),
+ work_queue(threads->work_queue) {
+ }
+};
+
template <>
struct MirrorStatusWatcher<librbd::MockTestImageCtx> {
static MirrorStatusWatcher* s_instance;
struct Instances<librbd::MockTestImageCtx> {
static Instances* s_instance;
- static Instances *create(Threads *threads, librados::IoCtx &ioctx) {
+ static Instances *create(Threads<librbd::MockTestImageCtx> *threads,
+ librados::IoCtx &ioctx) {
assert(s_instance != nullptr);
return s_instance;
}
typedef MirrorStatusWatcher<librbd::MockTestImageCtx> MockMirrorStatusWatcher;
typedef Instances<librbd::MockTestImageCtx> MockInstances;
typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
+ typedef Threads<librbd::MockTestImageCtx> MockThreads;
+
+ void SetUp() override {
+ TestMockFixture::SetUp();
+ m_mock_threads = new MockThreads(m_threads);
+ }
+
+ void TearDown() override {
+ delete m_mock_threads;
+ TestMockFixture::TearDown();
+ }
void expect_construct(MockManagedLock &mock_managed_lock) {
EXPECT_CALL(mock_managed_lock, construct());
void expect_init(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) {
EXPECT_CALL(mock_mirror_status_watcher, init(_))
- .WillOnce(CompleteContext(m_threads->work_queue, r));
+ .WillOnce(CompleteContext(m_mock_threads->work_queue, r));
}
void expect_shut_down(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) {
EXPECT_CALL(mock_mirror_status_watcher, shut_down(_))
- .WillOnce(CompleteContext(m_threads->work_queue, r));
+ .WillOnce(CompleteContext(m_mock_threads->work_queue, r));
expect_destroy(mock_mirror_status_watcher);
}
void expect_init(MockInstances &mock_instances, int r) {
EXPECT_CALL(mock_instances, init(_))
- .WillOnce(CompleteContext(m_threads->work_queue, r));
+ .WillOnce(CompleteContext(m_mock_threads->work_queue, r));
}
void expect_shut_down(MockInstances &mock_instances, int r) {
EXPECT_CALL(mock_instances, shut_down(_))
- .WillOnce(CompleteContext(m_threads->work_queue, r));
+ .WillOnce(CompleteContext(m_mock_threads->work_queue, r));
expect_destroy(mock_instances);
}
.WillOnce(CompleteContext(r));
expect_is_leader(mock_managed_lock, false, false);
}
+
+ MockThreads *m_mock_threads;
};
TEST_F(TestMockLeaderWatcher, InitShutdown) {
InSequence seq;
expect_construct(mock_managed_lock);
- MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+ MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener);
// Inint
C_SaferCond on_heartbeat_finish;
InSequence seq;
expect_construct(mock_managed_lock);
- MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+ MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener);
// Inint
C_SaferCond on_heartbeat_finish;
InSequence seq;
expect_construct(mock_managed_lock);
- MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+ MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener);
// Inint
C_SaferCond on_get_locker_finish;
InSequence seq;
expect_construct(mock_managed_lock);
- MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+ MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener);
// Inint
C_SaferCond on_heartbeat_finish;
InSequence seq;
expect_construct(mock_managed_lock);
- MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+ MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener);
// Init
expect_is_leader(mock_managed_lock, false, false);
}
template <typename I>
-ImageReplayer<I>::ImageReplayer(Threads *threads,
- shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<I> image_sync_throttler,
- RadosRef local,
- const std::string &local_mirror_uuid,
- int64_t local_pool_id,
- const std::string &global_image_id) :
+ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
+ shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<I> image_sync_throttler,
+ RadosRef local,
+ const std::string &local_mirror_uuid,
+ int64_t local_pool_id,
+ const std::string &global_image_id) :
m_threads(threads),
m_image_deleter(image_deleter),
m_image_sync_throttler(image_sync_throttler),
namespace rbd {
namespace mirror {
-struct Threads;
+template <typename> struct Threads;
namespace image_replayer { template <typename> class BootstrapRequest; }
namespace image_replayer { template <typename> class EventPreprocessor; }
STATE_STOPPED,
};
- ImageReplayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ ImageReplayer(Threads<librbd::ImageCtx> *threads,
+ std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
RadosRef local, const std::string &local_mirror_uuid,
int64_t local_pool_id, const std::string &global_image_id);
ImageReplayer<ImageCtxT> *replayer;
};
- Threads *m_threads;
+ Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
using librbd::util::create_rados_callback;
template <typename I>
-Instances<I>::Instances(Threads *threads, librados::IoCtx &ioctx) :
+Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx) :
m_threads(threads), m_ioctx(ioctx),
m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
m_lock("rbd::mirror::Instances " + ioctx.get_pool_name()) {
namespace rbd {
namespace mirror {
-struct Threads;
+template <typename> struct Threads;
template <typename ImageCtxT = librbd::ImageCtx>
class Instances {
public:
- static Instances *create(Threads *threads, librados::IoCtx &ioctx) {
+ static Instances *create(Threads<ImageCtxT> *threads,
+ librados::IoCtx &ioctx) {
return new Instances(threads, ioctx);
}
void destroy() {
delete this;
}
- Instances(Threads *threads, librados::IoCtx &ioctx);
+ Instances(Threads<ImageCtxT> *threads, librados::IoCtx &ioctx);
virtual ~Instances();
void init(Context *on_finish);
}
};
- Threads *m_threads;
+ Threads<ImageCtxT> *m_threads;
librados::IoCtx &m_ioctx;
CephContext *m_cct;
using librbd::util::create_rados_callback;
template <typename I>
-LeaderWatcher<I>::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx,
+LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
Listener *listener)
: Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
m_threads(threads), m_listener(listener),
namespace rbd {
namespace mirror {
-struct Threads;
+template <typename> struct Threads;
template <typename ImageCtxT = librbd::ImageCtx>
class LeaderWatcher : protected librbd::Watcher {
virtual void pre_release_handler(Context *on_finish) = 0;
};
- LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, Listener *listener);
+ LeaderWatcher(Threads<ImageCtxT> *threads, librados::IoCtx &io_ctx,
+ Listener *listener);
~LeaderWatcher() override;
int init();
}
};
- Threads *m_threads;
+ Threads<ImageCtxT> *m_threads;
Listener *m_listener;
Mutex m_lock;
#include "common/admin_socket.h"
#include "common/debug.h"
#include "common/errno.h"
+#include "librbd/ImageCtx.h"
#include "Mirror.h"
#include "Threads.h"
#include "ImageSync.h"
m_local(new librados::Rados()),
m_asok_hook(new MirrorAdminSocketHook(cct, this))
{
- cct->lookup_or_create_singleton_object<Threads>(m_threads,
- "rbd_mirror::threads");
+ cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
+ m_threads, "rbd_mirror::threads");
}
Mirror::~Mirror()
#include "ImageDeleter.h"
#include "types.h"
+namespace librbd { struct ImageCtx; }
+
namespace rbd {
namespace mirror {
-struct Threads;
+template <typename> struct Threads;
class MirrorAdminSocketHook;
/**
CephContext *m_cct;
std::vector<const char*> m_args;
- Threads *m_threads = nullptr;
+ Threads<librbd::ImageCtx> *m_threads = nullptr;
Mutex m_lock;
Cond m_cond;
RadosRef m_local;
Commands commands;
};
-Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+Replayer::Replayer(Threads<librbd::ImageCtx> *threads,
+ std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<> image_sync_throttler,
int64_t local_pool_id, const peer_t &peer,
const std::vector<const char*> &args) :
namespace rbd {
namespace mirror {
-struct Threads;
+template <typename> struct Threads;
class ReplayerAdminSocketHook;
template <typename> class InstanceWatcher;
*/
class Replayer {
public:
- Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ Replayer(Threads<librbd::ImageCtx> *threads,
+ std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<> image_sync_throttler,
int64_t local_pool_id, const peer_t &peer,
const std::vector<const char*> &args);
void handle_post_acquire_leader(Context *on_finish);
void handle_pre_release_leader(Context *on_finish);
- Threads *m_threads;
+ Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<> m_image_sync_throttler;
mutable Mutex m_lock;
#include "tools/rbd_mirror/Threads.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
namespace rbd {
namespace mirror {
-Threads::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") {
+template <typename I>
+Threads<I>::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") {
thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal",
cct->_conf->rbd_op_threads, "rbd_op_threads");
thread_pool->start();
timer->init();
}
-Threads::~Threads() {
+template <typename I>
+Threads<I>::~Threads() {
{
Mutex::Locker timer_locker(timer_lock);
timer->shutdown();
} // namespace mirror
} // namespace rbd
+
+template class rbd::mirror::Threads<librbd::ImageCtx>;
class SafeTimer;
class ThreadPool;
+namespace librbd { struct ImageCtx; }
+
namespace rbd {
namespace mirror {
+template <typename ImageCtxT = librbd::ImageCtx>
struct Threads {
ThreadPool *thread_pool = nullptr;
ContextWQ *work_queue = nullptr;
} // namespace mirror
} // namespace rbd
+extern template class rbd::mirror::Threads<librbd::ImageCtx>;
+
#endif // CEPH_RBD_MIRROR_THREADS_H