From: Jason Dillaman Date: Thu, 3 Mar 2016 15:26:24 +0000 (-0500) Subject: rbd-mirror: integrate single thread pool for all processing X-Git-Tag: v10.1.0~185^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F7906%2Fhead;p=ceph.git rbd-mirror: integrate single thread pool for all processing Signed-off-by: Jason Dillaman --- diff --git a/src/test/rbd_mirror/image_replay.cc b/src/test/rbd_mirror/image_replay.cc index cb175f8394b3..1b945f563fcd 100644 --- a/src/test/rbd_mirror/image_replay.cc +++ b/src/test/rbd_mirror/image_replay.cc @@ -10,6 +10,7 @@ #include "librbd/ImageCtx.h" #include "librbd/ImageState.h" #include "tools/rbd_mirror/ImageReplayer.h" +#include "tools/rbd_mirror/Threads.h" #include #include @@ -128,6 +129,7 @@ int main(int argc, const char **argv) rbd::mirror::RadosRef local(new librados::Rados()); rbd::mirror::RadosRef remote(new librados::Rados()); + rbd::mirror::Threads *threads = nullptr; int r = local->init_with_context(g_ceph_context); if (r < 0) { @@ -170,7 +172,9 @@ int main(int argc, const char **argv) dout(5) << "starting replay" << dendl; - replayer = new rbd::mirror::ImageReplayer(local, remote, client_id, + threads = new rbd::mirror::Threads(reinterpret_cast( + local->cct())); + replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id, remote_pool_id, remote_image_id); r = replayer->start(&bootstap_params); @@ -198,6 +202,7 @@ int main(int argc, const char **argv) shutdown_async_signal_handler(); delete replayer; + delete threads; g_ceph_context->put(); return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 80f9505e2941..11ce286fb94d 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -30,6 +30,7 @@ #include "librbd/internal.h" #include "tools/rbd_mirror/types.h" #include "tools/rbd_mirror/ImageReplayer.h" +#include "tools/rbd_mirror/Threads.h" #include "test/librados/test.h" #include "gtest/gtest.h" @@ -98,7 +99,11 @@ public: false, features, &order, 0, 0)); m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name); + m_threads = new rbd::mirror::Threads(reinterpret_cast( + m_local_ioctx.cct())); + m_replayer = new rbd::mirror::ImageReplayer( + m_threads, rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)), m_client_id, remote_pool_id, m_remote_image_id); @@ -109,6 +114,7 @@ public: ~TestImageReplayer() { delete m_replayer; + delete m_threads; EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str())); EXPECT_EQ(0, m_local_cluster.pool_delete(m_local_pool_name.c_str())); @@ -311,6 +317,7 @@ public: static int _image_number; + rbd::mirror::Threads *m_threads = nullptr; librados::Rados m_local_cluster, m_remote_cluster; std::string m_client_id; std::string m_local_pool_name, m_remote_pool_name; diff --git a/src/tools/Makefile-client.am b/src/tools/Makefile-client.am index 0a31d5c5452e..534e536fe5d6 100644 --- a/src/tools/Makefile-client.am +++ b/src/tools/Makefile-client.am @@ -86,6 +86,7 @@ librbd_mirror_internal_la_SOURCES = \ tools/rbd_mirror/Mirror.cc \ tools/rbd_mirror/PoolWatcher.cc \ tools/rbd_mirror/Replayer.cc \ + tools/rbd_mirror/Threads.cc \ tools/rbd_mirror/types.cc noinst_LTLIBRARIES += librbd_mirror_internal.la noinst_HEADERS += \ @@ -94,6 +95,7 @@ noinst_HEADERS += \ tools/rbd_mirror/Mirror.h \ tools/rbd_mirror/PoolWatcher.h \ tools/rbd_mirror/Replayer.h \ + tools/rbd_mirror/Threads.h \ tools/rbd_mirror/types.h rbd_mirror_SOURCES = \ diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 04e135f098c6..0554fe35811e 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -6,6 +6,8 @@ #include "common/errno.h" #include "include/stringify.h" #include "cls/rbd/cls_rbd_client.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" #include "journal/Journaler.h" #include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" @@ -18,6 +20,7 @@ #include "librbd/internal.h" #include "librbd/journal/Replay.h" #include "ImageReplayer.h" +#include "Threads.h" #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix @@ -158,10 +161,11 @@ private: Commands commands; }; -ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote, +ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, const std::string &client_id, int64_t remote_pool_id, const std::string &remote_image_id) : + m_threads(threads), m_local(local), m_remote(remote), m_client_id(client_id), @@ -183,6 +187,8 @@ ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote, ImageReplayer::~ImageReplayer() { + m_threads->work_queue->drain(); + assert(m_local_image_ctx == nullptr); assert(m_local_replay == nullptr); assert(m_remote_journaler == nullptr); @@ -219,9 +225,13 @@ int ImageReplayer::start(const BootstrapParams *bootstrap_params) } CephContext *cct = static_cast(m_local->cct()); + commit_interval = cct->_conf->rbd_journal_commit_age; bool remote_journaler_initialized = false; - m_remote_journaler = new ::journal::Journaler(m_remote_ioctx, + m_remote_journaler = new ::journal::Journaler(m_threads->work_queue, + m_threads->timer, + &m_threads->timer_lock, + m_remote_ioctx, remote_journal_id, m_client_id, commit_interval); r = get_registered_client_status(®istered); @@ -315,7 +325,7 @@ fail: if (m_remote_journaler) { if (remote_journaler_initialized) { m_remote_journaler->stop_replay(); - m_remote_journaler->shutdown(); + m_remote_journaler->shut_down(); } delete m_remote_journaler; m_remote_journaler = nullptr; @@ -378,7 +388,7 @@ void ImageReplayer::stop() m_local_ioctx.close(); m_remote_journaler->stop_replay(); - m_remote_journaler->shutdown(); + m_remote_journaler->shut_down(); delete m_remote_journaler; m_remote_journaler = nullptr; diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 305a1762100c..24f7f424cb5c 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -13,6 +13,8 @@ #include "include/rados/librados.hpp" #include "types.h" +class ContextWQ; + namespace journal { class Journaler; @@ -37,6 +39,7 @@ namespace rbd { namespace mirror { class ImageReplayerAdminSocketHook; +struct Threads; /** * Replays changes from a remote cluster for a single image. @@ -64,8 +67,9 @@ public: }; public: - ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id, - int64_t remote_pool_id, const std::string &remote_image_id); + ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, + const std::string &client_id, int64_t remote_pool_id, + const std::string &remote_image_id); virtual ~ImageReplayer(); ImageReplayer(const ImageReplayer&) = delete; ImageReplayer& operator=(const ImageReplayer&) = delete; @@ -98,6 +102,7 @@ private: friend std::ostream &operator<<(std::ostream &os, const ImageReplayer &replayer); private: + Threads *m_threads; RadosRef m_local, m_remote; std::string m_client_id; int64_t m_remote_pool_id, m_local_pool_id; diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 67111b01207f..e3999eea2d4c 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -6,6 +6,7 @@ #include "common/debug.h" #include "common/errno.h" #include "Mirror.h" +#include "Threads.h" #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix @@ -31,6 +32,8 @@ Mirror::Mirror(CephContext *cct) : m_lock("rbd::mirror::Mirror"), m_local(new librados::Rados()) { + cct->lookup_or_create_singleton_object(m_threads, + "rbd_mirror::threads"); } void Mirror::handle_signal(int signum) @@ -76,7 +79,7 @@ void Mirror::update_replayers(const map > &peer_configs) for (auto &kv : peer_configs) { const peer_t &peer = kv.first; if (m_replayers.find(peer) == m_replayers.end()) { - unique_ptr replayer(new Replayer(m_local, peer)); + unique_ptr replayer(new Replayer(m_threads, m_local, peer)); // TODO: make async, and retry connecting within replayer int r = replayer->init(); if (r < 0) { diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index cafbdd87af95..8408ee362d68 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -19,6 +19,8 @@ namespace rbd { namespace mirror { +struct Threads; + /** * Contains the main loop and overall state for rbd-mirror. * @@ -40,6 +42,7 @@ private: void update_replayers(const map > &peer_configs); CephContext *m_cct; + Threads *m_threads = nullptr; Mutex m_lock; Cond m_cond; RadosRef m_local; diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 0d8e3f329fe4..bd379f782519 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -21,7 +21,9 @@ using std::vector; namespace rbd { namespace mirror { -Replayer::Replayer(RadosRef local_cluster, const peer_t &peer) : +Replayer::Replayer(Threads *threads, RadosRef local_cluster, + const peer_t &peer) : + m_threads(threads), m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)), m_peer(peer), m_local(local_cluster), @@ -122,7 +124,8 @@ void Replayer::set_sources(const map > &images) auto &pool_replayers = m_images[pool_id]; for (const auto &image_id : kv.second) { if (pool_replayers.find(image_id) == pool_replayers.end()) { - unique_ptr image_replayer(new ImageReplayer(m_local, + unique_ptr image_replayer(new ImageReplayer(m_threads, + m_local, m_remote, m_client_id, pool_id, diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index 373cddf34db2..03199b6c8fb1 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -23,12 +23,14 @@ namespace rbd { namespace mirror { +struct Threads; + /** * Controls mirroring for a single remote cluster. */ class Replayer { public: - Replayer(RadosRef local_cluster, const peer_t &peer); + Replayer(Threads *threads, RadosRef local_cluster, const peer_t &peer); ~Replayer(); Replayer(const Replayer&) = delete; Replayer& operator=(const Replayer&) = delete; @@ -40,6 +42,7 @@ public: private: void set_sources(const std::map > &images); + Threads *m_threads; Mutex m_lock; Cond m_cond; atomic_t m_stopping; diff --git a/src/tools/rbd_mirror/Threads.cc b/src/tools/rbd_mirror/Threads.cc new file mode 100644 index 000000000000..8fa7d6d9a003 --- /dev/null +++ b/src/tools/rbd_mirror/Threads.cc @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "tools/rbd_mirror/Threads.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" + +namespace rbd { +namespace mirror { + +Threads::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") { + thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", + cct->_conf->rbd_op_threads, "rbd_op_threads"); + thread_pool->start(); + + work_queue = new ContextWQ("Journaler::work_queue", + cct->_conf->rbd_op_thread_timeout, thread_pool); + + timer = new SafeTimer(cct, timer_lock, true); + timer->init(); +} + +Threads::~Threads() { + { + Mutex::Locker timer_locker(timer_lock); + timer->shutdown(); + } + delete timer; + + work_queue->drain(); + delete work_queue; + + thread_pool->stop(); + delete thread_pool; +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/Threads.h b/src/tools/rbd_mirror/Threads.h new file mode 100644 index 000000000000..ba952836ad29 --- /dev/null +++ b/src/tools/rbd_mirror/Threads.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_THREADS_H +#define CEPH_RBD_MIRROR_THREADS_H + +#include "common/Mutex.h" + +class CephContext; +class ContextWQ; +class SafeTimer; +class ThreadPool; + +namespace rbd { +namespace mirror { + +struct Threads { + ThreadPool *thread_pool = nullptr; + ContextWQ *work_queue = nullptr; + + SafeTimer *timer = nullptr; + Mutex timer_lock; + + explicit Threads(CephContext *cct); + Threads(const Threads&) = delete; + Threads& operator=(const Threads&) = delete; + + ~Threads(); +}; + +} // namespace mirror +} // namespace rbd + +#endif // CEPH_RBD_MIRROR_THREADS_H