]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: Implementation of image-sync throttler
authorRicardo Dias <rdias@suse.com>
Wed, 8 Jun 2016 15:37:20 +0000 (16:37 +0100)
committerRicardo Dias <rdias@suse.com>
Tue, 28 Jun 2016 12:17:29 +0000 (13:17 +0100)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/CMakeLists.txt
src/common/config_opts.h
src/tools/Makefile-client.am
src/tools/rbd_mirror/ImageSyncThrottler.cc [new file with mode: 0644]
src/tools/rbd_mirror/ImageSyncThrottler.h [new file with mode: 0644]

index df103872a45b5c55c1dd914f9dbe0a0622615c14..16a22d66160cf0f61409f97475f5e5f84d9c86d9 100644 (file)
@@ -1223,6 +1223,7 @@ if(${WITH_RBD})
     tools/rbd_mirror/ImageReplayer.cc
     tools/rbd_mirror/ImageDeleter.cc
     tools/rbd_mirror/ImageSync.cc
+    tools/rbd_mirror/ImageSyncThrottler.cc
     tools/rbd_mirror/Mirror.cc
     tools/rbd_mirror/PoolWatcher.cc
     tools/rbd_mirror/Replayer.cc
index c1b94e22034b03651805a79c6234b07d2bd22458..e4179820345f91f8fd2ce4796f464b7b42ec805a 100644 (file)
@@ -1238,6 +1238,7 @@ OPTION(rbd_journal_pool, OPT_STR, "") // pool for journal objects
  * RBD Mirror options
  */
 OPTION(rbd_mirror_sync_point_update_age, OPT_DOUBLE, 30) // number of seconds between each update of the image sync point object number
+OPTION(rbd_mirror_concurrent_image_syncs, OPT_U32, 5) // maximum number of image syncs in parallel
 
 OPTION(nss_db_path, OPT_STR, "") // path to nss db
 
index 6bb86a6ab46159dbefb165bdf475ae77cb3bda62..323eb3642f7cce520ade99da63715b6697768c5a 100644 (file)
@@ -93,6 +93,7 @@ librbd_mirror_internal_la_SOURCES = \
        tools/rbd_mirror/ClusterWatcher.cc \
        tools/rbd_mirror/ImageReplayer.cc \
        tools/rbd_mirror/ImageSync.cc \
+        tools/rbd_mirror/ImageSyncThrottler.cc \
        tools/rbd_mirror/Mirror.cc \
        tools/rbd_mirror/PoolWatcher.cc \
        tools/rbd_mirror/Replayer.cc \
@@ -117,6 +118,7 @@ noinst_HEADERS += \
        tools/rbd_mirror/ClusterWatcher.h \
        tools/rbd_mirror/ImageReplayer.h \
        tools/rbd_mirror/ImageSync.h \
+       tools/rbd_mirror/ImageSyncThrottler.h \
        tools/rbd_mirror/Mirror.h \
        tools/rbd_mirror/PoolWatcher.h \
        tools/rbd_mirror/ProgressContext.h \
diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc
new file mode 100644 (file)
index 0000000..fd759ac
--- /dev/null
@@ -0,0 +1,243 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "ImageSyncThrottler.h"
+#include "ImageSync.h"
+#include "common/ceph_context.h"
+
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
+                           << " " << __func__ << ": "
+using std::unique_ptr;
+using std::string;
+using std::set;
+
+namespace rbd {
+namespace mirror {
+
+template <typename I>
+ImageSyncThrottler<I>::ImageSyncThrottler()
+  : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
+    m_lock("rbd::mirror::ImageSyncThrottler")
+{
+  dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
+           << dendl;
+  g_ceph_context->_conf->add_observer(this);
+}
+
+template <typename I>
+ImageSyncThrottler<I>::~ImageSyncThrottler() {
+  {
+    Mutex::Locker l(m_lock);
+    assert(m_sync_queue.empty());
+    assert(m_inflight_syncs.empty());
+  }
+
+  g_ceph_context->_conf->remove_observer(this);
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::start_sync(
+                            I *local_image_ctx, I *remote_image_ctx,
+                            SafeTimer *timer, Mutex *timer_lock,
+                            const std::string &mirror_uuid,
+                            Journaler *journaler,
+                            MirrorPeerClientMeta *client_meta,
+                            ContextWQ *work_queue, Context *on_finish,
+                            ProgressContext *progress_ctx) {
+  dout(20) << dendl;
+
+  C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id,
+                                                   on_finish);
+  sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
+                                         remote_image_ctx, timer,
+                                         timer_lock, mirror_uuid,
+                                         journaler, client_meta,
+                                         work_queue, sync_holder_ctx,
+                                        progress_ctx);
+  sync_holder_ctx->m_sync->get();
+
+  bool start = false;
+  {
+    Mutex::Locker l(m_lock);
+
+    if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
+      m_inflight_syncs.insert(std::make_pair(local_image_ctx->id,
+                                             sync_holder_ctx));
+      start = true;
+      dout(10) << "ready to start image sync for local_image_id "
+               << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
+               << m_max_concurrent_syncs << "]" << dendl;
+    } else {
+      m_sync_queue.push_front(sync_holder_ctx);
+      dout(10) << "image sync for local_image_id " << local_image_ctx->id
+               << " has been queued" << dendl;
+    }
+  }
+
+  if (start) {
+      sync_holder_ctx->m_sync->send();
+  }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::cancel_sync(const std::string& local_image_id) {
+  dout(20) << dendl;
+
+  C_SyncHolder *sync_holder = nullptr;
+  bool running_sync = true;
+
+  {
+    Mutex::Locker l(m_lock);
+
+    if (m_inflight_syncs.empty()) {
+      // no image sync currently running and neither waiting
+      return;
+    }
+
+    auto it = m_inflight_syncs.find(local_image_id);
+    if (it != m_inflight_syncs.end()) {
+      sync_holder = it->second;
+    }
+
+    if (!sync_holder) {
+      for (auto it=m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
+        if ((*it)->m_local_image_id == local_image_id) {
+          sync_holder = (*it);
+          m_sync_queue.erase(it);
+          running_sync = false;
+          break;
+        }
+      }
+    }
+  }
+
+  if (sync_holder) {
+    if (running_sync) {
+      dout(10) << "canceled running image sync for local_image_id "
+               << sync_holder->m_local_image_id << dendl;
+      sync_holder->m_sync->cancel();
+    } else {
+      dout(10) << "canceled waiting image sync for local_image_id "
+               << sync_holder->m_local_image_id << dendl;
+      sync_holder->m_on_finish->complete(-ECANCELED);
+      sync_holder->m_sync->put();
+      delete sync_holder;
+    }
+  }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
+  dout(20) << dendl;
+
+  C_SyncHolder *next_sync_holder = nullptr;
+
+  {
+    Mutex::Locker l(m_lock);
+
+    m_inflight_syncs.erase(sync_holder->m_local_image_id);
+
+    if (m_inflight_syncs.size() < m_max_concurrent_syncs
+        && !m_sync_queue.empty()) {
+      next_sync_holder = m_sync_queue.back();
+      m_sync_queue.pop_back();
+      m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id,
+                                             next_sync_holder));
+      dout(10) << "ready to start image sync for local_image_id "
+               << next_sync_holder->m_local_image_id
+               << " [" << m_inflight_syncs.size() << "/"
+               << m_max_concurrent_syncs << "]" << dendl;
+    }
+
+    dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
+             << "/" << m_max_concurrent_syncs << "]" << dendl;
+  }
+
+  if (next_sync_holder) {
+    next_sync_holder->m_sync->send();
+  }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
+  dout(20) << " max=" << max << dendl;
+
+  assert(max > 0);
+
+  std::list<C_SyncHolder *> next_sync_holders;
+  {
+    Mutex::Locker l(m_lock);
+    this->m_max_concurrent_syncs = max;
+
+    // Start waiting syncs in the case of available free slots
+    while(m_inflight_syncs.size() < m_max_concurrent_syncs
+          && !m_sync_queue.empty()) {
+        C_SyncHolder *next_sync_holder = m_sync_queue.back();
+        next_sync_holders.push_back(next_sync_holder);
+        m_sync_queue.pop_back();
+        m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id,
+                                               next_sync_holder));
+        dout(10) << "ready to start image sync for local_image_id "
+                 << next_sync_holder->m_local_image_id
+                 << " [" << m_inflight_syncs.size() << "/"
+                 << m_max_concurrent_syncs << "]" << dendl;
+    }
+  }
+
+  for (const auto& sync_holder : next_sync_holders) {
+    sync_holder->m_sync->send();
+  }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
+  Mutex::Locker l(m_lock);
+
+  if (f) {
+    f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
+    f->dump_int("running_syncs", m_inflight_syncs.size());
+    f->dump_int("waiting_syncs", m_sync_queue.size());
+    f->flush(*ss);
+  } else {
+    *ss << "[ ";
+    *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
+    *ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
+    *ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
+  }
+}
+
+template <typename I>
+const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
+  static const char* KEYS[] = {
+    "rbd_mirror_concurrent_image_syncs",
+    NULL
+  };
+  return KEYS;
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::handle_conf_change(
+                                              const struct md_config_t *conf,
+                                              const set<string> &changed) {
+  if (changed.count("rbd_mirror_concurrent_image_syncs")) {
+    set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
+  }
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h
new file mode 100644 (file)
index 0000000..6c7fbf3
--- /dev/null
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+#define CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+
+#include <list>
+#include <map>
+#include "common/Mutex.h"
+#include "librbd/ImageCtx.h"
+#include "include/Context.h"
+#include "librbd/journal/TypeTraits.h"
+
+class CephContext;
+class Context;
+class ContextWQ;
+class SafeTimer;
+namespace journal { class Journaler; }
+namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
+
+namespace rbd {
+namespace mirror {
+
+template <typename> class ImageSync;
+
+class ProgressContext;
+
+/**
+ * Manage concurrent image-syncs
+ */
+template <typename ImageCtxT = librbd::ImageCtx>
+class ImageSyncThrottler : public md_config_obs_t {
+public:
+
+  typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
+  typedef typename TypeTraits::Journaler Journaler;
+  typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+
+  ImageSyncThrottler();
+  ~ImageSyncThrottler();
+  ImageSyncThrottler(const ImageSyncThrottler&) = delete;
+  ImageSyncThrottler& operator=(const ImageSyncThrottler&) = delete;
+
+  void start_sync(ImageCtxT *local_image_ctx,
+                  ImageCtxT *remote_image_ctx, SafeTimer *timer,
+                  Mutex *timer_lock, const std::string &mirror_uuid,
+                  Journaler *journaler, MirrorPeerClientMeta *client_meta,
+                  ContextWQ *work_queue, Context *on_finish,
+                  ProgressContext *progress_ctx = nullptr);
+
+  void cancel_sync(const std::string& mirror_uuid);
+
+  void set_max_concurrent_syncs(uint32_t max);
+
+  void print_status(Formatter *f, std::stringstream *ss);
+
+private:
+
+  struct C_SyncHolder : public Context {
+    ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
+    std::string m_local_image_id;
+    ImageSync<ImageCtxT> *m_sync;
+    Context *m_on_finish;
+
+    C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
+                 const std::string& local_image_id, Context *on_finish)
+      : m_sync_throttler(sync_throttler), m_local_image_id(local_image_id),
+        m_sync(nullptr), m_on_finish(on_finish) {}
+
+    virtual void finish(int r) {
+      m_sync_throttler->handle_sync_finished(this);
+      m_on_finish->complete(r);
+    }
+  };
+
+  void handle_sync_finished(C_SyncHolder *sync_holder);
+
+  const char **get_tracked_conf_keys() const;
+  void handle_conf_change(const struct md_config_t *conf,
+                          const std::set<std::string> &changed);
+
+  uint32_t m_max_concurrent_syncs;
+  Mutex m_lock;
+  std::list<C_SyncHolder *> m_sync_queue;
+  std::map<std::string, C_SyncHolder *> m_inflight_syncs;
+
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H