]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: each pool replayer should use its own librados connection
authorJason Dillaman <dillaman@redhat.com>
Thu, 23 Jun 2016 20:03:03 +0000 (16:03 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 6 Jul 2016 15:36:11 +0000 (11:36 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index e5594b2cb76698a11f78cf43b6b5a18ab39d71ba..b30be56f647a3ab19eaed26419795bf677ec2db3 100644 (file)
@@ -8,6 +8,7 @@
 #include "cls/rbd/cls_rbd_client.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
+#include "global/global_context.h"
 #include "journal/Journaler.h"
 #include "journal/ReplayHandler.h"
 #include "librbd/ExclusiveLock.h"
@@ -288,8 +289,8 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
   }
   m_name = pool_name + "/" + m_global_image_id;
 
-  CephContext *cct = static_cast<CephContext *>(m_local->cct());
-  m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
+  m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
+                                                    this);
 }
 
 template <typename I>
@@ -462,8 +463,8 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
       }
     }
     if (!m_asok_hook) {
-      CephContext *cct = static_cast<CephContext *>(m_local->cct());
-      m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
+      m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
+                                                        this);
     }
   }
 
index 666f622ed2b2f95011ba25f4ecf9c17bd9e18131..cefd8a48f388cba21fbe5ab0878d73d696e681ab 100644 (file)
@@ -375,8 +375,7 @@ void Mirror::update_replayers(const PoolPeers &pool_peers)
         dout(20) << "starting replayer for " << peer << dendl;
         unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
                                                    m_image_sync_throttler,
-                                                   m_local, kv.first, peer,
-                                                   m_args));
+                                                   kv.first, peer, m_args));
         // TODO: make async, and retry connecting within replayer
         int r = replayer->init();
         if (r < 0) {
index 5f633aa2e2d812a05b11a54faa56be88edcf75f0..38fcf0de108f5e9deba2a8780acba9c9b2e9d50b 100644 (file)
@@ -12,6 +12,7 @@
 #include "common/errno.h"
 #include "include/stringify.h"
 #include "cls/rbd/cls_rbd_client.h"
+#include "global/global_context.h"
 #include "librbd/ObjectWatcher.h"
 #include "librbd/internal.h"
 #include "Replayer.h"
@@ -230,16 +231,14 @@ private:
 
 Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
                    ImageSyncThrottlerRef<> image_sync_throttler,
-                   RadosRef local_cluster, int64_t local_pool_id,
-                   const peer_t &peer, const std::vector<const char*> &args) :
+                   int64_t local_pool_id, const peer_t &peer,
+                   const std::vector<const char*> &args) :
   m_threads(threads),
   m_image_deleter(image_deleter),
   m_image_sync_throttler(image_sync_throttler),
   m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
   m_peer(peer),
   m_args(args),
-  m_local(local_cluster),
-  m_remote(new librados::Rados),
   m_local_pool_id(local_pool_id),
   m_asok_hook(nullptr),
   m_replayer_thread(this)
@@ -264,32 +263,73 @@ int Replayer::init()
 {
   dout(20) << "replaying for " << m_peer << dendl;
 
-  int r = m_local->ioctx_create2(m_local_pool_id, m_local_io_ctx);
+  int r = init_rados(g_ceph_context->_conf->cluster,
+                     g_ceph_context->_conf->name.to_str(),
+                     "local cluster", &m_local_rados);
+  if (r < 0) {
+    return r;
+  }
+
+  r = init_rados(m_peer.cluster_name, m_peer.client_name,
+                 std::string("remote peer ") + stringify(m_peer),
+                 &m_remote_rados);
+  if (r < 0) {
+    return r;
+  }
+
+  r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
   if (r < 0) {
     derr << "error accessing local pool " << m_local_pool_id << ": "
          << cpp_strerror(r) << dendl;
     return r;
   }
 
+  r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
+                                   m_remote_io_ctx);
+  if (r < 0) {
+    derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
+         << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  m_remote_pool_id = m_remote_io_ctx.get_id();
+
+  dout(20) << "connected to " << m_peer << dendl;
+
+  // Bootstrap existing mirroring images
+  init_local_mirroring_images();
+
+  // TODO: make interval configurable
+  m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx, 30, m_lock, m_cond));
+  m_pool_watcher->refresh_images();
+
+  m_replayer_thread.create("replayer");
+
+  return 0;
+}
+
+int Replayer::init_rados(const std::string &cluster_name,
+                         const std::string &client_name,
+                         const std::string &description, RadosRef *rados_ref) {
+  rados_ref->reset(new librados::Rados());
+
   // NOTE: manually bootstrap a CephContext here instead of via
   // the librados API to avoid mixing global singletons between
   // the librados shared library and the daemon
   // TODO: eliminate intermingling of global singletons within Ceph APIs
   CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
-  if (m_peer.client_name.empty() ||
-      !iparams.name.from_str(m_peer.client_name)) {
-    derr << "error initializing remote cluster handle for " << m_peer << dendl;
+  if (client_name.empty() || !iparams.name.from_str(client_name)) {
+    derr << "error initializing cluster handle for " << description << dendl;
     return -EINVAL;
   }
 
   CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
                                     CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
-  cct->_conf->cluster = m_peer.cluster_name;
+  cct->_conf->cluster = cluster_name;
 
   // librados::Rados::conf_read_file
-  r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
+  int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
   if (r < 0) {
-    derr << "could not read ceph conf for " << m_peer << ": "
+    derr << "could not read ceph conf for " << description << ": "
         << cpp_strerror(r) << dendl;
     cct->put();
     return r;
@@ -301,7 +341,7 @@ int Replayer::init()
   env_to_vec(args, nullptr);
   r = cct->_conf->parse_argv(args);
   if (r < 0) {
-    derr << "could not parse environment for " << m_peer << ":"
+    derr << "could not parse environment for " << description << ":"
          << cpp_strerror(r) << dendl;
     cct->put();
     return r;
@@ -309,9 +349,10 @@ int Replayer::init()
 
   if (!m_args.empty()) {
     // librados::Rados::conf_parse_argv
-    r = cct->_conf->parse_argv(m_args);
+    args = m_args;
+    r = cct->_conf->parse_argv(args);
     if (r < 0) {
-      derr << "could not parse command line args for " << m_peer << ": "
+      derr << "could not parse command line args for " << description << ": "
           << cpp_strerror(r) << dendl;
       cct->put();
       return r;
@@ -323,36 +364,16 @@ int Replayer::init()
   cct->_conf->apply_changes(nullptr);
   cct->_conf->complain_about_parse_errors(cct);
 
-  r = m_remote->init_with_context(cct);
+  r = (*rados_ref)->init_with_context(cct);
   assert(r == 0);
   cct->put();
 
-  r = m_remote->connect();
-  if (r < 0) {
-    derr << "error connecting to remote cluster " << m_peer
-        << " : " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  r = m_remote->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
-                             m_remote_io_ctx);
+  r = (*rados_ref)->connect();
   if (r < 0) {
-    derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
-         << ": " << cpp_strerror(r) << dendl;
+    derr << "error connecting to " << description << ": "
+        << cpp_strerror(r) << dendl;
     return r;
   }
-  m_remote_pool_id = m_remote_io_ctx.get_id();
-
-  dout(20) << "connected to " << m_peer << dendl;
-
-  // Bootstrap existing mirroring images
-  init_local_mirroring_images();
-
-  // TODO: make interval configurable
-  m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx, 30, m_lock, m_cond));
-  m_pool_watcher->refresh_images();
-
-  m_replayer_thread.create("replayer");
 
   return 0;
 }
@@ -415,8 +436,8 @@ void Replayer::run()
       m_asok_hook_name = asok_hook_name;
       delete m_asok_hook;
 
-      CephContext *cct = static_cast<CephContext *>(m_local->cct());
-      m_asok_hook = new ReplayerAdminSocketHook(cct, m_asok_hook_name, this);
+      m_asok_hook = new ReplayerAdminSocketHook(g_ceph_context,
+                                                m_asok_hook_name, this);
     }
 
     Mutex::Locker l(m_lock);
@@ -611,8 +632,8 @@ void Replayer::set_sources(const ImageIds &image_ids)
     auto it = m_image_replayers.find(image_id.id);
     if (it == m_image_replayers.end()) {
       unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
-        m_threads, m_image_deleter, m_image_sync_throttler, m_local, m_remote,
-        local_mirror_uuid, remote_mirror_uuid, m_local_pool_id,
+        m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
+        m_remote_rados, local_mirror_uuid, remote_mirror_uuid, m_local_pool_id,
         m_remote_pool_id, image_id.id, image_id.global_id));
       it = m_image_replayers.insert(
         std::make_pair(image_id.id, std::move(image_replayer))).first;
index a39c8fb8db9241b9950e835d987b49ca892cf5c1..5bbed3e347a44ed5b412d1ef2991728938b62ede 100644 (file)
@@ -35,7 +35,7 @@ class Replayer {
 public:
   Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
            ImageSyncThrottlerRef<> image_sync_throttler,
-           RadosRef local_cluster, int64_t local_pool_id, const peer_t &peer,
+           int64_t local_pool_id, const peer_t &peer,
            const std::vector<const char*> &args);
   ~Replayer();
   Replayer(const Replayer&) = delete;
@@ -65,6 +65,9 @@ private:
   int mirror_image_status_init();
   void mirror_image_status_shut_down();
 
+  int init_rados(const std::string &cluser_name, const std::string &client_name,
+                 const std::string &description, RadosRef *rados_ref);
+
   Threads *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
   ImageSyncThrottlerRef<> m_image_sync_throttler;
@@ -75,8 +78,8 @@ private:
 
   peer_t m_peer;
   std::vector<const char*> m_args;
-  RadosRef m_local;
-  RadosRef m_remote;
+  RadosRef m_local_rados;
+  RadosRef m_remote_rados;
 
   librados::IoCtx m_local_io_ctx;
   librados::IoCtx m_remote_io_ctx;