]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: forward initial pool image deletions to instance
authorJason Dillaman <dillaman@redhat.com>
Tue, 25 Apr 2017 03:02:17 +0000 (23:02 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 27 Apr 2017 19:54:13 +0000 (15:54 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_mock_InstanceReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/InstanceReplayer.cc
src/tools/rbd_mirror/PoolReplayer.cc
src/tools/rbd_mirror/PoolReplayer.h

index d406d9f57fa8474b56d9922d8fca6cc172e03ab6..53a9d0b6edd388c266c757286b5d67e1ae1a0837 100644 (file)
@@ -72,8 +72,9 @@ struct ImageReplayer<librbd::MockTestImageCtx> {
   MOCK_METHOD3(add_remote_image, void(const std::string &,
                                       const std::string &,
                                       librados::IoCtx &));
-  MOCK_METHOD2(remove_remote_image, void(const std::string &,
-                                         const std::string &));
+  MOCK_METHOD3(remove_remote_image, void(const std::string &,
+                                         const std::string &,
+                                         bool));
   MOCK_METHOD0(remote_images_empty, bool());
   MOCK_METHOD0(get_global_image_id, const std::string &());
   MOCK_METHOD0(get_local_image_id, const std::string &());
@@ -174,7 +175,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
   C_SaferCond on_release;
 
   EXPECT_CALL(mock_image_replayer,
-              remove_remote_image("remote_mirror_uuid", "remote_image_id"));
+              remove_remote_image("remote_mirror_uuid", "remote_image_id",
+                                  false));
   EXPECT_CALL(mock_image_replayer, remote_images_empty())
     .WillOnce(Return(true));
   EXPECT_CALL(mock_image_replayer, is_stopped())
index 2ab66bbb9c1abdb2553afe1cedb059963ff2e0ce..f9a38c2fe3fb1f9bf3095199170b6d7e8d3002b2 100644 (file)
@@ -335,7 +335,8 @@ void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
 
 template <typename I>
 void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
-                                           const std::string &image_id) {
+                                           const std::string &image_id,
+                                          bool schedule_delete) {
   Mutex::Locker locker(m_lock);
   m_remote_images.erase({mirror_uuid, image_id});
 }
index 8cb33eefed68102a8756f174122bed3e024e2d54..934df3e460a49df22a748947a568c5b9e3e32e8d 100644 (file)
@@ -109,7 +109,8 @@ public:
                         const std::string &remote_image_id,
                         librados::IoCtx &remote_io_ctx);
   void remove_remote_image(const std::string &remote_mirror_uuid,
-                           const std::string &remote_image_id);
+                           const std::string &remote_image_id,
+                          bool schedule_delete);
   bool remote_images_empty() const;
 
   inline int64_t get_local_pool_id() const {
index 74735d6344435f9e91ae0051731881289038ed25..d2426d0d0e433974bb8cea966d89dc875cde7f08 100644 (file)
@@ -156,13 +156,13 @@ void InstanceReplayer<I>::acquire_image(const std::string &global_image_id,
   }
 
   auto image_replayer = it->second;
+  if (!peer_mirror_uuid.empty()) {
+    auto iter = m_peers.find(Peer(peer_mirror_uuid));
+    assert(iter != m_peers.end());
+    auto io_ctx = iter->io_ctx;
 
-  auto iter = m_peers.find(Peer(peer_mirror_uuid));
-  assert(iter != m_peers.end());
-  auto io_ctx = iter->io_ctx;
-
-  image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
-
+    image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
+  }
   start_image_replayer(image_replayer);
 
   m_threads->work_queue->queue(on_finish, 0);
@@ -190,11 +190,13 @@ void InstanceReplayer<I>::release_image(const std::string &global_image_id,
   }
 
   auto image_replayer = it->second;
-
-  image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id);
+  if (!peer_mirror_uuid.empty()) {
+    image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
+                                       schedule_delete);
+  }
 
   if (!image_replayer->remote_images_empty()) {
-    dout(20) << global_image_id << ": still has remote images" << dendl;
+    dout(20) << global_image_id << ": still has peer images" << dendl;
     m_threads->work_queue->queue(on_finish, 0);
     return;
   }
index dbede1ed0b5f83c32077ebbad5296371ce1e5c7f..0dbfeffe54d4892a525f5cddcb5c9a8db12e5a47 100644 (file)
@@ -21,7 +21,6 @@
 #include "InstanceWatcher.h"
 #include "LeaderWatcher.h"
 #include "Threads.h"
-#include "pool_watcher/RefreshImagesRequest.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -206,20 +205,6 @@ private:
 
 } // anonymous namespace
 
-struct PoolReplayer::C_RefreshLocalImages : public Context {
-  PoolReplayer *pool_replayer;
-  Context *on_finish;
-  ImageIds image_ids;
-
-  C_RefreshLocalImages(PoolReplayer *pool_replayer, Context *on_finish)
-    : pool_replayer(pool_replayer), on_finish(on_finish) {
-  }
-
-  void finish(int r) override {
-    pool_replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish);
-  }
-};
-
 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
                           std::shared_ptr<ImageDeleter> image_deleter,
                           ImageSyncThrottlerRef<> image_sync_throttler,
@@ -232,7 +217,8 @@ PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
   m_peer(peer),
   m_args(args),
   m_local_pool_id(local_pool_id),
-  m_pool_watcher_listener(this),
+  m_local_pool_watcher_listener(this, true),
+  m_remote_pool_watcher_listener(this, false),
   m_asok_hook(nullptr),
   m_pool_replayer_thread(this),
   m_leader_listener(this)
@@ -261,7 +247,8 @@ PoolReplayer::~PoolReplayer()
     m_instance_replayer->shut_down();
   }
 
-  assert(!m_pool_watcher);
+  assert(!m_local_pool_watcher);
+  assert(!m_remote_pool_watcher);
 }
 
 bool PoolReplayer::is_blacklisted() const {
@@ -435,7 +422,8 @@ void PoolReplayer::run()
     }
 
     Mutex::Locker locker(m_lock);
-    if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
+    if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
+       (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
       m_blacklisted = true;
       m_stopping.set(1);
       break;
@@ -553,42 +541,44 @@ void PoolReplayer::release_leader()
 void PoolReplayer::handle_update(const std::string &mirror_uuid,
                                 ImageIds &&added_image_ids,
                                 ImageIds &&removed_image_ids) {
-  assert(!mirror_uuid.empty());
   if (m_stopping.read()) {
     return;
   }
 
-  dout(10) << dendl;
+  dout(10) << "mirror_uuid=" << mirror_uuid << ", "
+           << "added_count=" << added_image_ids.size() << ", "
+           << "removed_count=" << removed_image_ids.size() << dendl;
   Mutex::Locker locker(m_lock);
   if (!m_leader_watcher->is_leader()) {
     return;
   }
 
-  if (m_peer.uuid != mirror_uuid) {
-    m_instance_replayer->remove_peer(m_peer.uuid);
-    m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
-    m_peer.uuid = mirror_uuid;
-  }
+  if (m_initial_mirror_image_ids.find(mirror_uuid) ==
+        m_initial_mirror_image_ids.end() &&
+      m_initial_mirror_image_ids.size() < 2) {
+    m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
+
+    if (m_initial_mirror_image_ids.size() == 2) {
+      dout(10) << "local and remote pools refreshed" << dendl;
 
-  // first callback will be a full directory -- so see if we need to remove
-  // any local images that no longer exist on the remote side
-  if (!m_init_image_ids.empty()) {
-    dout(20) << "scanning initial local image set" << dendl;
-    for (auto &image_id : added_image_ids) {
-      auto it = m_init_image_ids.find(image_id);
-      if (it != m_init_image_ids.end()) {
-        m_init_image_ids.erase(it);
+      // both local and remote initial pool listing received. derive
+      // removal notifications for the remote pool
+      auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
+      auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
+      for (auto &local_image_id : local_image_ids) {
+        if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
+          removed_image_ids.emplace(local_image_id.global_id, "");
+        }
       }
+      local_image_ids.clear();
+      remote_image_ids.clear();
     }
+  }
 
-    // the remaining images in m_init_image_ids must be deleted
-    for (auto &image_id : m_init_image_ids) {
-      dout(20) << "scheduling the deletion of init image: "
-               << image_id.global_id << " (" << image_id.id << ")" << dendl;
-      m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id,
-                                             image_id.global_id);
-    }
-    m_init_image_ids.clear();
+  if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
+    m_instance_replayer->remove_peer(m_peer.uuid);
+    m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
+    m_peer.uuid = mirror_uuid;
   }
 
   m_update_op_tracker.start_op();
@@ -599,19 +589,19 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
 
   C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
 
-  for (auto &image_id : removed_image_ids) {
+  for (auto &image_id : added_image_ids) {
     // for now always send to myself (the leader)
     std::string &instance_id = m_instance_watcher->get_instance_id();
-    m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
-                                             mirror_uuid, image_id.id, true,
+    m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
+                                             mirror_uuid, image_id.id,
                                              gather_ctx->new_sub());
   }
 
-  for (auto &image_id : added_image_ids) {
+  for (auto &image_id : removed_image_ids) {
     // for now always send to myself (the leader)
     std::string &instance_id = m_instance_watcher->get_instance_id();
-    m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
-                                             mirror_uuid, image_id.id,
+    m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
+                                             mirror_uuid, image_id.id, true,
                                              gather_ctx->new_sub());
   }
 
@@ -620,68 +610,73 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
 
 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
   dout(20) << dendl;
-  refresh_local_images(on_finish);
+  init_local_pool_watcher(on_finish);
 }
 
 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
   dout(20) << dendl;
-  shut_down_pool_watcher(on_finish);
+  shut_down_pool_watchers(on_finish);
 }
 
-void PoolReplayer::refresh_local_images(Context *on_finish) {
+void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
   dout(20) << dendl;
 
+  Mutex::Locker locker(m_lock);
+  assert(!m_local_pool_watcher);
+  m_local_pool_watcher.reset(new PoolWatcher<>(
+    m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
+  m_initial_mirror_image_ids.clear();
+
   // ensure the initial set of local images is up-to-date
   // after acquiring the leader role
-  auto ctx = new C_RefreshLocalImages(this, on_finish);
-  auto req = pool_watcher::RefreshImagesRequest<>::create(
-    m_local_io_ctx, &ctx->image_ids, ctx);
-  req->send();
+  auto ctx = new FunctionContext([this, on_finish](int r) {
+      handle_init_local_pool_watcher(r, on_finish);
+    });
+  m_local_pool_watcher->init(create_async_context_callback(
+    m_threads->work_queue, ctx));
 }
 
-void PoolReplayer::handle_refresh_local_images(int r, ImageIds &&image_ids,
-                                              Context *on_finish) {
+void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
   dout(20) << "r=" << r << dendl;
-
-  {
-    Mutex::Locker locker(m_lock);
-    m_init_image_ids = std::move(image_ids);
-  }
-
   if (r < 0) {
     derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
     on_finish->complete(r);
     return;
   }
 
-  init_pool_watcher(on_finish);
+  init_remote_pool_watcher(on_finish);
 }
 
-void PoolReplayer::init_pool_watcher(Context *on_finish) {
+void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
   dout(20) << dendl;
 
   Mutex::Locker locker(m_lock);
-  assert(!m_pool_watcher);
-  m_pool_watcher.reset(new PoolWatcher<>(
-    m_threads, m_remote_io_ctx, m_pool_watcher_listener));
-  m_pool_watcher->init(create_async_context_callback(
+  assert(!m_remote_pool_watcher);
+  m_remote_pool_watcher.reset(new PoolWatcher<>(
+    m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
+  m_remote_pool_watcher->init(create_async_context_callback(
     m_threads->work_queue, on_finish));
 
   m_cond.Signal();
 }
 
-void PoolReplayer::shut_down_pool_watcher(Context *on_finish) {
+void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
   dout(20) << dendl;
 
   {
     Mutex::Locker locker(m_lock);
-    if (m_pool_watcher) {
+    if (m_local_pool_watcher) { 
       Context *ctx = new FunctionContext([this, on_finish](int r) {
-          handle_shut_down_pool_watcher(r, on_finish);
-      });
+          handle_shut_down_pool_watchers(r, on_finish);
+       });
       ctx = create_async_context_callback(m_threads->work_queue, ctx);
 
-      m_pool_watcher->shut_down(ctx);
+      auto gather_ctx = new C_Gather(g_ceph_context, ctx);
+      m_local_pool_watcher->shut_down(gather_ctx->new_sub());
+      if (m_remote_pool_watcher) {
+       m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
+      }
+      gather_ctx->activate();
       return;
     }
   }
@@ -689,13 +684,17 @@ void PoolReplayer::shut_down_pool_watcher(Context *on_finish) {
   on_finish->complete(0);
 }
 
-void PoolReplayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
+void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
   dout(20) << "r=" << r << dendl;
 
   {
     Mutex::Locker locker(m_lock);
-    assert(m_pool_watcher);
-    m_pool_watcher.reset();
+    assert(m_local_pool_watcher);
+    m_local_pool_watcher.reset();
+
+    if (m_remote_pool_watcher) {
+      m_remote_pool_watcher.reset();
+    }
   }
   wait_for_update_ops(on_finish);
 }
index 526ed536c0fa3265a34ee3a38bbcb81bc9d953ea..620b6db9a5e7ca0df5c584ab3429503961db40fe 100644 (file)
@@ -63,21 +63,21 @@ public:
 private:
   struct PoolWatcherListener : public PoolWatcher<>::Listener {
     PoolReplayer *pool_replayer;
+    bool local;
 
-    PoolWatcherListener(PoolReplayer *pool_replayer)
-      : pool_replayer(pool_replayer) {
+    PoolWatcherListener(PoolReplayer *pool_replayer, bool local)
+      : pool_replayer(pool_replayer), local(local) {
     }
 
     void handle_update(const std::string &mirror_uuid,
                        ImageIds &&added_image_ids,
                        ImageIds &&removed_image_ids) override {
-      pool_replayer->handle_update(mirror_uuid, std::move(added_image_ids),
-                                  std::move(removed_image_ids));
+      pool_replayer->handle_update((local ? "" : mirror_uuid),
+                                  std::move(added_image_ids),
+                                   std::move(removed_image_ids));
     }
   };
 
-  struct C_RefreshLocalImages;
-
   void handle_update(const std::string &mirror_uuid,
                      ImageIds &&added_image_ids,
                      ImageIds &&removed_image_ids);
@@ -89,13 +89,13 @@ private:
   void handle_post_acquire_leader(Context *on_finish);
   void handle_pre_release_leader(Context *on_finish);
 
-  void refresh_local_images(Context *on_finish);
-  void handle_refresh_local_images(int r, ImageIds &&image_ids,
-                                   Context *on_finish);
+  void init_local_pool_watcher(Context *on_finish);
+  void handle_init_local_pool_watcher(int r, Context *on_finish);
+
+  void init_remote_pool_watcher(Context *on_finish);
 
-  void init_pool_watcher(Context *on_finish);
-  void shut_down_pool_watcher(Context *on_finish);
-  void handle_shut_down_pool_watcher(int r, Context *on_finish);
+  void shut_down_pool_watchers(Context *on_finish);
+  void handle_shut_down_pool_watchers(int r, Context *on_finish);
 
   void wait_for_update_ops(Context *on_finish);
   void handle_wait_for_update_ops(int r, Context *on_finish);
@@ -119,15 +119,18 @@ private:
 
   int64_t m_local_pool_id = -1;
 
-  PoolWatcherListener m_pool_watcher_listener;
-  std::unique_ptr<PoolWatcher<> > m_pool_watcher;
+  PoolWatcherListener m_local_pool_watcher_listener;
+  std::unique_ptr<PoolWatcher<> > m_local_pool_watcher;
+
+  PoolWatcherListener m_remote_pool_watcher_listener;
+  std::unique_ptr<PoolWatcher<> > m_remote_pool_watcher;
 
   std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
 
   std::string m_asok_hook_name;
   AdminSocketHook *m_asok_hook;
 
-  std::set<ImageId> m_init_image_ids;
+  std::map<std::string, ImageIds> m_initial_mirror_image_ids;
 
   class PoolReplayerThread : public Thread {
     PoolReplayer *m_pool_replayer;