]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: make sync throttler per pool
authorMykola Golub <mgolub@mirantis.com>
Sun, 16 Apr 2017 17:58:42 +0000 (19:58 +0200)
committerMykola Golub <mgolub@mirantis.com>
Tue, 6 Jun 2017 08:42:25 +0000 (10:42 +0200)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc
src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc
src/tools/rbd_mirror/ImageSyncThrottler.cc
src/tools/rbd_mirror/ImageSyncThrottler.h
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/PoolReplayer.cc
src/tools/rbd_mirror/PoolReplayer.h
src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc

index a7c7b27b7a4dd5dcaf12026bab141e0c1fe45041..83692986b1bb5ee1514cc7382afbec98e36eb001 100644 (file)
@@ -53,8 +53,7 @@ struct ImageSyncThrottler<librbd::MockTestImageCtx> {
                                  librbd::journal::MirrorPeerClientMeta *client_meta,
                                  ContextWQ *work_queue, Context *on_finish,
                                  ProgressContext *progress_ctx));
-  MOCK_METHOD2(cancel_sync, void(librados::IoCtx &local_io_ctx,
-                                 const std::string& mirror_uuid));
+  MOCK_METHOD1(cancel_sync, void(const std::string& mirror_uuid));
 };
 
 namespace image_replayer {
index d3e3d28a6efaf5577a1667351c3d6497705ab0e1..25bd9526797f5d443bd9b98011290d08a37b17d4 100644 (file)
@@ -157,7 +157,7 @@ public:
     } else {
       EXPECT_CALL(*sync, cancel()).Times(0);
     }
-    mock_sync_throttler->cancel_sync(m_local_io_ctx, mirror_uuid);
+    mock_sync_throttler->cancel_sync(mirror_uuid);
   }
 
   librbd::ImageCtx *m_remote_image_ctx;
index a199565f1cdf14d336fd5dc074b14653f19aec36..366109d62be73c5e350bfe2bbd976b49643f13ca 100644 (file)
@@ -15,6 +15,7 @@
 #include "ImageSyncThrottler.h"
 #include "ImageSync.h"
 #include "common/ceph_context.h"
+#include "librbd/Utils.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -31,14 +32,14 @@ namespace mirror {
 template <typename ImageCtxT>
 struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
   ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
-  PoolImageId m_local_pool_image_id;
+  std::string m_local_image_id;
   ImageSync<ImageCtxT> *m_sync = nullptr;
   Context *m_on_finish;
 
   C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
-               const PoolImageId &local_pool_image_id, Context *on_finish)
+               const std::string &local_image_id, Context *on_finish)
     : m_sync_throttler(sync_throttler),
-      m_local_pool_image_id(local_pool_image_id), m_on_finish(on_finish) {
+      m_local_image_id(local_image_id), m_on_finish(on_finish) {
   }
 
   void finish(int r) override {
@@ -52,7 +53,7 @@ struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
 template <typename I>
 ImageSyncThrottler<I>::ImageSyncThrottler()
   : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
-    m_lock("rbd::mirror::ImageSyncThrottler")
+    m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", this))
 {
   dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
            << dendl;
@@ -81,9 +82,7 @@ void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
                                        ProgressContext *progress_ctx) {
   dout(20) << dendl;
 
-  PoolImageId pool_image_id(local_image_ctx->md_ctx.get_id(),
-                            local_image_ctx->id);
-  C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, pool_image_id,
+  C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id,
                                                    on_finish);
   sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
                                                  remote_image_ctx, timer,
@@ -98,8 +97,8 @@ void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
     Mutex::Locker l(m_lock);
 
     if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
-      assert(m_inflight_syncs.count(pool_image_id) == 0);
-      m_inflight_syncs[pool_image_id] = sync_holder_ctx;
+      assert(m_inflight_syncs.count(local_image_ctx->id) == 0);
+      m_inflight_syncs[local_image_ctx->id] = sync_holder_ctx;
       start = true;
       dout(10) << "ready to start image sync for local_image_id "
                << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
@@ -117,8 +116,7 @@ void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx,
-                                        const std::string local_image_id) {
+void ImageSyncThrottler<I>::cancel_sync(const std::string &local_image_id) {
   dout(20) << dendl;
 
   C_SyncHolder *sync_holder = nullptr;
@@ -131,16 +129,14 @@ void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx,
       return;
     }
 
-    PoolImageId local_pool_image_id(local_io_ctx.get_id(),
-                                    local_image_id);
-    auto it = m_inflight_syncs.find(local_pool_image_id);
+    auto it = m_inflight_syncs.find(local_image_id);
     if (it != m_inflight_syncs.end()) {
       sync_holder = it->second;
     }
 
     if (!sync_holder) {
       for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
-        if ((*it)->m_local_pool_image_id == local_pool_image_id) {
+        if ((*it)->m_local_image_id == local_image_id) {
           sync_holder = (*it);
           m_sync_queue.erase(it);
           running_sync = false;
@@ -153,11 +149,11 @@ void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx,
   if (sync_holder) {
     if (running_sync) {
       dout(10) << "canceled running image sync for local_image_id "
-               << sync_holder->m_local_pool_image_id.second << dendl;
+               << sync_holder->m_local_image_id << dendl;
       sync_holder->m_sync->cancel();
     } else {
       dout(10) << "canceled waiting image sync for local_image_id "
-               << sync_holder->m_local_pool_image_id.second << dendl;
+               << sync_holder->m_local_image_id << dendl;
       sync_holder->m_on_finish->complete(-ECANCELED);
       sync_holder->m_sync->put();
       delete sync_holder;
@@ -173,7 +169,7 @@ void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
 
   {
     Mutex::Locker l(m_lock);
-    m_inflight_syncs.erase(sync_holder->m_local_pool_image_id);
+    m_inflight_syncs.erase(sync_holder->m_local_image_id);
 
     if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
         !m_sync_queue.empty()) {
@@ -181,13 +177,13 @@ void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
       m_sync_queue.pop_back();
 
       assert(
-        m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
-      m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
+        m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
+      m_inflight_syncs[next_sync_holder->m_local_image_id] =
         next_sync_holder;
       dout(10) << "ready to start image sync for local_image_id "
-               << next_sync_holder->m_local_pool_image_id.second
-               << " [" << m_inflight_syncs.size() << "/"
-               << m_max_concurrent_syncs << "]" << dendl;
+               << next_sync_holder->m_local_image_id << " ["
+               << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
+               << "]" << dendl;
     }
 
     dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
@@ -218,14 +214,13 @@ void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
         m_sync_queue.pop_back();
 
         assert(
-          m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
-        m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
-          next_sync_holder;
+          m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
+        m_inflight_syncs[next_sync_holder->m_local_image_id] = next_sync_holder;
 
         dout(10) << "ready to start image sync for local_image_id "
-                 << next_sync_holder->m_local_pool_image_id.second
-                 << " [" << m_inflight_syncs.size() << "/"
-                 << m_max_concurrent_syncs << "]" << dendl;
+                 << next_sync_holder->m_local_image_id << " ["
+                 << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
+                 << "]" << dendl;
     }
   }
 
index abe01e4369555ec491b4ca49355d15924fa3c198..64d92ece43a6ce703c40474ef4e7b18b8fb1f573 100644 (file)
@@ -60,16 +60,13 @@ public:
                   ContextWQ *work_queue, Context *on_finish,
                   ProgressContext *progress_ctx = nullptr);
 
-  void cancel_sync(librados::IoCtx &local_io_ctx,
-                   const std::string local_image_id);
+  void cancel_sync(const std::string &local_image_id);
 
   void set_max_concurrent_syncs(uint32_t max);
 
   void print_status(Formatter *f, std::stringstream *ss);
 
 private:
-  typedef std::pair<int64_t, std::string> PoolImageId;
-
   struct C_SyncHolder;
 
   void handle_sync_finished(C_SyncHolder *sync_holder);
@@ -81,7 +78,7 @@ private:
   uint32_t m_max_concurrent_syncs;
   Mutex m_lock;
   std::list<C_SyncHolder *> m_sync_queue;
-  std::map<PoolImageId, C_SyncHolder *> m_inflight_syncs;
+  std::map<std::string, C_SyncHolder *> m_inflight_syncs;
 
 };
 
index f37d255992242e7dff4f0bed9bee2c06ad83c49a..d383a8b0af5bbbd14843470c2ea7d4848a6e1115 100644 (file)
@@ -242,8 +242,6 @@ int Mirror::init()
                                          m_threads->timer,
                                          &m_threads->timer_lock));
 
-  m_image_sync_throttler.reset(new ImageSyncThrottler<>());
-
   return r;
 }
 
@@ -294,19 +292,6 @@ void Mirror::print_status(Formatter *f, stringstream *ss)
   }
 
   m_image_deleter->print_status(f, ss);
-
-  if (f) {
-    f->close_section();
-    f->open_object_section("sync_throttler");
-  }
-
-  m_image_sync_throttler->print_status(f, ss);
-
-  if (f) {
-    f->close_section();
-    f->close_section();
-    f->flush(*ss);
-  }
 }
 
 void Mirror::start()
@@ -414,8 +399,7 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
       if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
         dout(20) << "starting pool replayer for " << peer << dendl;
         unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
-         m_threads, m_image_deleter, m_image_sync_throttler, kv.first, peer,
-         m_args));
+         m_threads, m_image_deleter, kv.first, peer, m_args));
 
         // TODO: make async, and retry connecting within pool replayer
         int r = pool_replayer->init();
index 2253156d660cbb64fab79bab1ffcc0256f55c558..cda474f19135de87f596eaa825c13acb4d311915 100644 (file)
@@ -65,7 +65,6 @@ private:
   // monitor local cluster for config changes in peers
   std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
   std::shared_ptr<ImageDeleter> m_image_deleter;
-  ImageSyncThrottlerRef<> m_image_sync_throttler;
   std::map<PoolPeer, std::unique_ptr<PoolReplayer> > m_pool_replayers;
   std::atomic<bool> m_stopping = { false };
   bool m_manual_stop = false;
index 0bd06f624ae040366724bf2d44b89c054f638d72..58aa6f31b1a35580b26efa57e8de599dd50d2099 100644 (file)
@@ -207,12 +207,10 @@ private:
 
 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
                           std::shared_ptr<ImageDeleter> image_deleter,
-                          ImageSyncThrottlerRef<> image_sync_throttler,
                           int64_t local_pool_id, const peer_t &peer,
                           const std::vector<const char*> &args) :
   m_threads(threads),
   m_image_deleter(image_deleter),
-  m_image_sync_throttler(image_sync_throttler),
   m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
   m_peer(peer),
   m_args(args),
@@ -305,6 +303,8 @@ int PoolReplayer::init()
 
   dout(20) << "connected to " << m_peer << dendl;
 
+  m_image_sync_throttler.reset(new ImageSyncThrottler<>());
+
   m_instance_replayer.reset(
     InstanceReplayer<>::create(m_threads, m_image_deleter,
                                m_image_sync_throttler, m_local_rados,
@@ -476,6 +476,10 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss)
                  reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
                      admin_socket);
 
+  f->open_object_section("sync_throttler");
+  m_image_sync_throttler->print_status(f, ss);
+  f->close_section();
+
   m_instance_replayer->print_status(f, ss);
 
   f->close_section();
index 87a6589356eb116b45138ffc5b249c15f8626aef..820134e423434a9abca8836f7867ad45438cd3f2 100644 (file)
@@ -40,7 +40,6 @@ class PoolReplayer {
 public:
   PoolReplayer(Threads<librbd::ImageCtx> *threads,
               std::shared_ptr<ImageDeleter> image_deleter,
-              ImageSyncThrottlerRef<> image_sync_throttler,
               int64_t local_pool_id, const peer_t &peer,
               const std::vector<const char*> &args);
   ~PoolReplayer();
index 8818945b10f1adb6c2f27e60fc3da5c1b2e6ce54..a5a609708ece57d134fffdbff8bf71f13b566726 100644 (file)
@@ -88,7 +88,7 @@ void BootstrapRequest<I>::cancel() {
   Mutex::Locker locker(m_lock);
   m_canceled = true;
 
-  m_image_sync_throttler->cancel_sync(m_local_io_ctx, m_local_image_id);
+  m_image_sync_throttler->cancel_sync(m_local_image_id);
 }
 
 template <typename I>