#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/Threads.h"
#include <string>
#include <vector>
rbd::mirror::RadosRef local(new librados::Rados());
rbd::mirror::RadosRef remote(new librados::Rados());
+ rbd::mirror::Threads *threads = nullptr;
int r = local->init_with_context(g_ceph_context);
if (r < 0) {
dout(5) << "starting replay" << dendl;
- replayer = new rbd::mirror::ImageReplayer(local, remote, client_id,
+ threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+ local->cct()));
+ replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id,
remote_pool_id, remote_image_id);
r = replayer->start(&bootstap_params);
shutdown_async_signal_handler();
delete replayer;
+ delete threads;
g_ceph_context->put();
return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
#include "librbd/internal.h"
#include "tools/rbd_mirror/types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/Threads.h"
#include "test/librados/test.h"
#include "gtest/gtest.h"
false, features, &order, 0, 0));
m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);
+ m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+ m_local_ioctx.cct()));
+
m_replayer = new rbd::mirror::ImageReplayer(
+ m_threads,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
m_client_id, remote_pool_id, m_remote_image_id);
~TestImageReplayer()
{
delete m_replayer;
+ delete m_threads;
EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
EXPECT_EQ(0, m_local_cluster.pool_delete(m_local_pool_name.c_str()));
static int _image_number;
+ rbd::mirror::Threads *m_threads = nullptr;
librados::Rados m_local_cluster, m_remote_cluster;
std::string m_client_id;
std::string m_local_pool_name, m_remote_pool_name;
tools/rbd_mirror/Mirror.cc \
tools/rbd_mirror/PoolWatcher.cc \
tools/rbd_mirror/Replayer.cc \
+ tools/rbd_mirror/Threads.cc \
tools/rbd_mirror/types.cc
noinst_LTLIBRARIES += librbd_mirror_internal.la
noinst_HEADERS += \
tools/rbd_mirror/Mirror.h \
tools/rbd_mirror/PoolWatcher.h \
tools/rbd_mirror/Replayer.h \
+ tools/rbd_mirror/Threads.h \
tools/rbd_mirror/types.h
rbd_mirror_SOURCES = \
#include "common/errno.h"
#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
#include "librbd/internal.h"
#include "librbd/journal/Replay.h"
#include "ImageReplayer.h"
+#include "Threads.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
Commands commands;
};
-ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,
+ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &client_id,
int64_t remote_pool_id,
const std::string &remote_image_id) :
+ m_threads(threads),
m_local(local),
m_remote(remote),
m_client_id(client_id),
ImageReplayer::~ImageReplayer()
{
+ m_threads->work_queue->drain();
+
assert(m_local_image_ctx == nullptr);
assert(m_local_replay == nullptr);
assert(m_remote_journaler == nullptr);
}
CephContext *cct = static_cast<CephContext *>(m_local->cct());
+
commit_interval = cct->_conf->rbd_journal_commit_age;
bool remote_journaler_initialized = false;
- m_remote_journaler = new ::journal::Journaler(m_remote_ioctx,
+ m_remote_journaler = new ::journal::Journaler(m_threads->work_queue,
+ m_threads->timer,
+ &m_threads->timer_lock,
+ m_remote_ioctx,
remote_journal_id,
m_client_id, commit_interval);
r = get_registered_client_status(®istered);
if (m_remote_journaler) {
if (remote_journaler_initialized) {
m_remote_journaler->stop_replay();
- m_remote_journaler->shutdown();
+ m_remote_journaler->shut_down();
}
delete m_remote_journaler;
m_remote_journaler = nullptr;
m_local_ioctx.close();
m_remote_journaler->stop_replay();
- m_remote_journaler->shutdown();
+ m_remote_journaler->shut_down();
delete m_remote_journaler;
m_remote_journaler = nullptr;
#include "include/rados/librados.hpp"
#include "types.h"
+class ContextWQ;
+
namespace journal {
class Journaler;
namespace mirror {
class ImageReplayerAdminSocketHook;
+struct Threads;
/**
* Replays changes from a remote cluster for a single image.
};
public:
- ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id,
- int64_t remote_pool_id, const std::string &remote_image_id);
+ ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
+ const std::string &client_id, int64_t remote_pool_id,
+ const std::string &remote_image_id);
virtual ~ImageReplayer();
ImageReplayer(const ImageReplayer&) = delete;
ImageReplayer& operator=(const ImageReplayer&) = delete;
friend std::ostream &operator<<(std::ostream &os,
const ImageReplayer &replayer);
private:
+ Threads *m_threads;
RadosRef m_local, m_remote;
std::string m_client_id;
int64_t m_remote_pool_id, m_local_pool_id;
#include "common/debug.h"
#include "common/errno.h"
#include "Mirror.h"
+#include "Threads.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
m_lock("rbd::mirror::Mirror"),
m_local(new librados::Rados())
{
+ cct->lookup_or_create_singleton_object<Threads>(m_threads,
+ "rbd_mirror::threads");
}
void Mirror::handle_signal(int signum)
for (auto &kv : peer_configs) {
const peer_t &peer = kv.first;
if (m_replayers.find(peer) == m_replayers.end()) {
- unique_ptr<Replayer> replayer(new Replayer(m_local, peer));
+ unique_ptr<Replayer> replayer(new Replayer(m_threads, m_local, peer));
// TODO: make async, and retry connecting within replayer
int r = replayer->init();
if (r < 0) {
namespace rbd {
namespace mirror {
+struct Threads;
+
/**
* Contains the main loop and overall state for rbd-mirror.
*
void update_replayers(const map<peer_t, set<int64_t> > &peer_configs);
CephContext *m_cct;
+ Threads *m_threads = nullptr;
Mutex m_lock;
Cond m_cond;
RadosRef m_local;
namespace rbd {
namespace mirror {
-Replayer::Replayer(RadosRef local_cluster, const peer_t &peer) :
+Replayer::Replayer(Threads *threads, RadosRef local_cluster,
+ const peer_t &peer) :
+ m_threads(threads),
m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
m_peer(peer),
m_local(local_cluster),
auto &pool_replayers = m_images[pool_id];
for (const auto &image_id : kv.second) {
if (pool_replayers.find(image_id) == pool_replayers.end()) {
- unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_local,
+ unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_threads,
+ m_local,
m_remote,
m_client_id,
pool_id,
namespace rbd {
namespace mirror {
+struct Threads;
+
/**
* Controls mirroring for a single remote cluster.
*/
class Replayer {
public:
- Replayer(RadosRef local_cluster, const peer_t &peer);
+ Replayer(Threads *threads, RadosRef local_cluster, const peer_t &peer);
~Replayer();
Replayer(const Replayer&) = delete;
Replayer& operator=(const Replayer&) = delete;
private:
void set_sources(const std::map<int64_t, std::set<std::string> > &images);
+ Threads *m_threads;
Mutex m_lock;
Cond m_cond;
atomic_t m_stopping;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tools/rbd_mirror/Threads.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+
+namespace rbd {
+namespace mirror {
+
+Threads::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") {
+ thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal",
+ cct->_conf->rbd_op_threads, "rbd_op_threads");
+ thread_pool->start();
+
+ work_queue = new ContextWQ("Journaler::work_queue",
+ cct->_conf->rbd_op_thread_timeout, thread_pool);
+
+ timer = new SafeTimer(cct, timer_lock, true);
+ timer->init();
+}
+
+Threads::~Threads() {
+ {
+ Mutex::Locker timer_locker(timer_lock);
+ timer->shutdown();
+ }
+ delete timer;
+
+ work_queue->drain();
+ delete work_queue;
+
+ thread_pool->stop();
+ delete thread_pool;
+}
+
+} // namespace mirror
+} // namespace rbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_THREADS_H
+#define CEPH_RBD_MIRROR_THREADS_H
+
+#include "common/Mutex.h"
+
+class CephContext;
+class ContextWQ;
+class SafeTimer;
+class ThreadPool;
+
+namespace rbd {
+namespace mirror {
+
+struct Threads {
+ ThreadPool *thread_pool = nullptr;
+ ContextWQ *work_queue = nullptr;
+
+ SafeTimer *timer = nullptr;
+ Mutex timer_lock;
+
+ explicit Threads(CephContext *cct);
+ Threads(const Threads&) = delete;
+ Threads& operator=(const Threads&) = delete;
+
+ ~Threads();
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_THREADS_H