]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: integrate single thread pool for all processing 7906/head
authorJason Dillaman <dillaman@redhat.com>
Thu, 3 Mar 2016 15:26:24 +0000 (10:26 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Mar 2016 12:11:51 +0000 (07:11 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/image_replay.cc
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/Makefile-client.am
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h
src/tools/rbd_mirror/Threads.cc [new file with mode: 0644]
src/tools/rbd_mirror/Threads.h [new file with mode: 0644]

index cb175f8394b396258030952dac507b1bf6ec51f7..1b945f563fcdad8558b76f34f73d523b4a402cb5 100644 (file)
@@ -10,6 +10,7 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageState.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/Threads.h"
 
 #include <string>
 #include <vector>
@@ -128,6 +129,7 @@ int main(int argc, const char **argv)
 
   rbd::mirror::RadosRef local(new librados::Rados());
   rbd::mirror::RadosRef remote(new librados::Rados());
+  rbd::mirror::Threads *threads = nullptr;
 
   int r = local->init_with_context(g_ceph_context);
   if (r < 0) {
@@ -170,7 +172,9 @@ int main(int argc, const char **argv)
 
   dout(5) << "starting replay" << dendl;
 
-  replayer = new rbd::mirror::ImageReplayer(local, remote, client_id,
+  threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+    local->cct()));
+  replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id,
                                            remote_pool_id, remote_image_id);
 
   r = replayer->start(&bootstap_params);
@@ -198,6 +202,7 @@ int main(int argc, const char **argv)
   shutdown_async_signal_handler();
 
   delete replayer;
+  delete threads;
   g_ceph_context->put();
 
   return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
index 80f9505e29413064312e13e6295e73e55838b07e..11ce286fb94df0ef7f5c73d56e8b55f132f5ed7d 100644 (file)
@@ -30,6 +30,7 @@
 #include "librbd/internal.h"
 #include "tools/rbd_mirror/types.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/Threads.h"
 
 #include "test/librados/test.h"
 #include "gtest/gtest.h"
@@ -98,7 +99,11 @@ public:
                                false, features, &order, 0, 0));
     m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);
 
+    m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+      m_local_ioctx.cct()));
+
     m_replayer = new rbd::mirror::ImageReplayer(
+      m_threads,
       rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
       rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
       m_client_id, remote_pool_id, m_remote_image_id);
@@ -109,6 +114,7 @@ public:
   ~TestImageReplayer()
   {
     delete m_replayer;
+    delete m_threads;
 
     EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
     EXPECT_EQ(0, m_local_cluster.pool_delete(m_local_pool_name.c_str()));
@@ -311,6 +317,7 @@ public:
 
   static int _image_number;
 
+  rbd::mirror::Threads *m_threads = nullptr;
   librados::Rados m_local_cluster, m_remote_cluster;
   std::string m_client_id;
   std::string m_local_pool_name, m_remote_pool_name;
index 0a31d5c5452e40ec7336c2e42b23918a80b24879..534e536fe5d6973be3f7bace5c20b299335489df 100644 (file)
@@ -86,6 +86,7 @@ librbd_mirror_internal_la_SOURCES = \
        tools/rbd_mirror/Mirror.cc \
        tools/rbd_mirror/PoolWatcher.cc \
        tools/rbd_mirror/Replayer.cc \
+       tools/rbd_mirror/Threads.cc \
        tools/rbd_mirror/types.cc
 noinst_LTLIBRARIES += librbd_mirror_internal.la
 noinst_HEADERS += \
@@ -94,6 +95,7 @@ noinst_HEADERS += \
        tools/rbd_mirror/Mirror.h \
        tools/rbd_mirror/PoolWatcher.h \
        tools/rbd_mirror/Replayer.h \
+       tools/rbd_mirror/Threads.h \
        tools/rbd_mirror/types.h
 
 rbd_mirror_SOURCES = \
index 04e135f098c6b29224efebc42cae58c3458489ab..0554fe35811e40678212f1be561ed0c8ba542ddf 100644 (file)
@@ -6,6 +6,8 @@
 #include "common/errno.h"
 #include "include/stringify.h"
 #include "cls/rbd/cls_rbd_client.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
 #include "journal/Journaler.h"
 #include "journal/ReplayEntry.h"
 #include "journal/ReplayHandler.h"
@@ -18,6 +20,7 @@
 #include "librbd/internal.h"
 #include "librbd/journal/Replay.h"
 #include "ImageReplayer.h"
+#include "Threads.h"
 
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
@@ -158,10 +161,11 @@ private:
   Commands commands;
 };
 
-ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,
+ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
                             const std::string &client_id,
                             int64_t remote_pool_id,
                             const std::string &remote_image_id) :
+  m_threads(threads),
   m_local(local),
   m_remote(remote),
   m_client_id(client_id),
@@ -183,6 +187,8 @@ ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,
 
 ImageReplayer::~ImageReplayer()
 {
+  m_threads->work_queue->drain();
+
   assert(m_local_image_ctx == nullptr);
   assert(m_local_replay == nullptr);
   assert(m_remote_journaler == nullptr);
@@ -219,9 +225,13 @@ int ImageReplayer::start(const BootstrapParams *bootstrap_params)
   }
 
   CephContext *cct = static_cast<CephContext *>(m_local->cct());
+
   commit_interval = cct->_conf->rbd_journal_commit_age;
   bool remote_journaler_initialized = false;
-  m_remote_journaler = new ::journal::Journaler(m_remote_ioctx,
+  m_remote_journaler = new ::journal::Journaler(m_threads->work_queue,
+                                                m_threads->timer,
+                                                &m_threads->timer_lock,
+                                                m_remote_ioctx,
                                                remote_journal_id,
                                                m_client_id, commit_interval);
   r = get_registered_client_status(&registered);
@@ -315,7 +325,7 @@ fail:
   if (m_remote_journaler) {
     if (remote_journaler_initialized) {
       m_remote_journaler->stop_replay();
-      m_remote_journaler->shutdown();
+      m_remote_journaler->shut_down();
     }
     delete m_remote_journaler;
     m_remote_journaler = nullptr;
@@ -378,7 +388,7 @@ void ImageReplayer::stop()
   m_local_ioctx.close();
 
   m_remote_journaler->stop_replay();
-  m_remote_journaler->shutdown();
+  m_remote_journaler->shut_down();
   delete m_remote_journaler;
   m_remote_journaler = nullptr;
 
index 305a1762100cdb492325f14267f76857b825ccfd..24f7f424cb5cc4d9b8d3c1686bf093cd0dade03b 100644 (file)
@@ -13,6 +13,8 @@
 #include "include/rados/librados.hpp"
 #include "types.h"
 
+class ContextWQ;
+
 namespace journal {
 
 class Journaler;
@@ -37,6 +39,7 @@ namespace rbd {
 namespace mirror {
 
 class ImageReplayerAdminSocketHook;
+struct Threads;
 
 /**
  * Replays changes from a remote cluster for a single image.
@@ -64,8 +67,9 @@ public:
   };
 
 public:
-  ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id,
-               int64_t remote_pool_id, const std::string &remote_image_id);
+  ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
+                const std::string &client_id, int64_t remote_pool_id,
+                const std::string &remote_image_id);
   virtual ~ImageReplayer();
   ImageReplayer(const ImageReplayer&) = delete;
   ImageReplayer& operator=(const ImageReplayer&) = delete;
@@ -98,6 +102,7 @@ private:
   friend std::ostream &operator<<(std::ostream &os,
                                  const ImageReplayer &replayer);
 private:
+  Threads *m_threads;
   RadosRef m_local, m_remote;
   std::string m_client_id;
   int64_t m_remote_pool_id, m_local_pool_id;
index 67111b01207f5c0b61d14fa95d993299b3ec0c74..e3999eea2d4cff142bbe8c2e0fa18d7a267922e4 100644 (file)
@@ -6,6 +6,7 @@
 #include "common/debug.h"
 #include "common/errno.h"
 #include "Mirror.h"
+#include "Threads.h"
 
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
@@ -31,6 +32,8 @@ Mirror::Mirror(CephContext *cct) :
   m_lock("rbd::mirror::Mirror"),
   m_local(new librados::Rados())
 {
+  cct->lookup_or_create_singleton_object<Threads>(m_threads,
+                                                  "rbd_mirror::threads");
 }
 
 void Mirror::handle_signal(int signum)
@@ -76,7 +79,7 @@ void Mirror::update_replayers(const map<peer_t, set<int64_t> > &peer_configs)
   for (auto &kv : peer_configs) {
     const peer_t &peer = kv.first;
     if (m_replayers.find(peer) == m_replayers.end()) {
-      unique_ptr<Replayer> replayer(new Replayer(m_local, peer));
+      unique_ptr<Replayer> replayer(new Replayer(m_threads, m_local, peer));
       // TODO: make async, and retry connecting within replayer
       int r = replayer->init();
       if (r < 0) {
index cafbdd87af954c3e000c82baab39aa8f2e83cc80..8408ee362d689e5ffc3e78e198ebf0071daba019 100644 (file)
@@ -19,6 +19,8 @@
 namespace rbd {
 namespace mirror {
 
+struct Threads;
+
 /**
  * Contains the main loop and overall state for rbd-mirror.
  *
@@ -40,6 +42,7 @@ private:
   void update_replayers(const map<peer_t, set<int64_t> > &peer_configs);
 
   CephContext *m_cct;
+  Threads *m_threads = nullptr;
   Mutex m_lock;
   Cond m_cond;
   RadosRef m_local;
index 0d8e3f329fe4a4a1a70aeadc282c5e2dd31c4711..bd379f782519908ee1b64eabcee54aa2c0c7691b 100644 (file)
@@ -21,7 +21,9 @@ using std::vector;
 namespace rbd {
 namespace mirror {
 
-Replayer::Replayer(RadosRef local_cluster, const peer_t &peer) :
+Replayer::Replayer(Threads *threads, RadosRef local_cluster,
+                   const peer_t &peer) :
+  m_threads(threads),
   m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
   m_peer(peer),
   m_local(local_cluster),
@@ -122,7 +124,8 @@ void Replayer::set_sources(const map<int64_t, set<string> > &images)
     auto &pool_replayers = m_images[pool_id];
     for (const auto &image_id : kv.second) {
       if (pool_replayers.find(image_id) == pool_replayers.end()) {
-       unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_local,
+       unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_threads,
+                                                                   m_local,
                                                                   m_remote,
                                                                   m_client_id,
                                                                   pool_id,
index 373cddf34db2b0c54edbcc9a1a703a6170878e2c..03199b6c8fb1c79f1615ce206da096d9a42e5aaa 100644 (file)
 namespace rbd {
 namespace mirror {
 
+struct Threads;
+
 /**
  * Controls mirroring for a single remote cluster.
  */
 class Replayer {
 public:
-  Replayer(RadosRef local_cluster, const peer_t &peer);
+  Replayer(Threads *threads, RadosRef local_cluster, const peer_t &peer);
   ~Replayer();
   Replayer(const Replayer&) = delete;
   Replayer& operator=(const Replayer&) = delete;
@@ -40,6 +42,7 @@ public:
 private:
   void set_sources(const std::map<int64_t, std::set<std::string> > &images);
 
+  Threads *m_threads;
   Mutex m_lock;
   Cond m_cond;
   atomic_t m_stopping;
diff --git a/src/tools/rbd_mirror/Threads.cc b/src/tools/rbd_mirror/Threads.cc
new file mode 100644 (file)
index 0000000..8fa7d6d
--- /dev/null
@@ -0,0 +1,38 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tools/rbd_mirror/Threads.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+
+namespace rbd {
+namespace mirror {
+
+Threads::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") {
+  thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal",
+                               cct->_conf->rbd_op_threads, "rbd_op_threads");
+  thread_pool->start();
+
+  work_queue = new ContextWQ("Journaler::work_queue",
+                             cct->_conf->rbd_op_thread_timeout, thread_pool);
+
+  timer = new SafeTimer(cct, timer_lock, true);
+  timer->init();
+}
+
+Threads::~Threads() {
+  {
+    Mutex::Locker timer_locker(timer_lock);
+    timer->shutdown();
+  }
+  delete timer;
+
+  work_queue->drain();
+  delete work_queue;
+
+  thread_pool->stop();
+  delete thread_pool;
+}
+
+} // namespace mirror
+} // namespace rbd
diff --git a/src/tools/rbd_mirror/Threads.h b/src/tools/rbd_mirror/Threads.h
new file mode 100644 (file)
index 0000000..ba95283
--- /dev/null
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_THREADS_H
+#define CEPH_RBD_MIRROR_THREADS_H
+
+#include "common/Mutex.h"
+
+class CephContext;
+class ContextWQ;
+class SafeTimer;
+class ThreadPool;
+
+namespace rbd {
+namespace mirror {
+
+struct Threads {
+  ThreadPool *thread_pool = nullptr;
+  ContextWQ *work_queue = nullptr;
+
+  SafeTimer *timer = nullptr;
+  Mutex timer_lock;
+
+  explicit Threads(CephContext *cct);
+  Threads(const Threads&) = delete;
+  Threads& operator=(const Threads&) = delete;
+
+  ~Threads();
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_THREADS_H