]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: new add/remove instances listener
authorJason Dillaman <dillaman@redhat.com>
Fri, 9 Mar 2018 02:46:17 +0000 (21:46 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 10 Apr 2018 20:31:32 +0000 (16:31 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_Instances.cc
src/test/rbd_mirror/test_mock_LeaderWatcher.cc
src/tools/rbd_mirror/Instances.cc
src/tools/rbd_mirror/Instances.h
src/tools/rbd_mirror/LeaderWatcher.cc
src/tools/rbd_mirror/LeaderWatcher.h
src/tools/rbd_mirror/instances/Types.h [new file with mode: 0644]

index eaf86d25f2b35ee018955a60129a8efc925d072e..e233f5d79078e5a698c5781516db174d9874072f 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "test/librados/test.h"
 #include "gtest/gtest.h"
+#include <vector>
 
 using rbd::mirror::InstanceWatcher;
 using rbd::mirror::Instances;
@@ -19,16 +20,53 @@ void register_test_instances() {
 
 class TestInstances : public ::rbd::mirror::TestFixture {
 public:
+  struct Listener : public rbd::mirror::instances::Listener {
+    std::mutex lock;
+
+    struct Instance {
+      uint32_t count = 0;
+      std::set<std::string> ids;
+      C_SaferCond ctx;
+    };
+
+    Instance add;
+    Instance remove;
+
+    void handle(const InstanceIds& instance_ids, Instance* instance) {
+      std::unique_lock<std::mutex> locker(lock);
+      for (auto& instance_id : instance_ids) {
+        assert(instance->count > 0);
+        --instance->count;
+
+        instance->ids.insert(instance_id);
+        if (instance->count == 0) {
+          instance->ctx.complete(0);
+        }
+      }
+    }
+
+    void handle_added(const InstanceIds& instance_ids) override {
+      handle(instance_ids, &add);
+    }
+
+    void handle_removed(const InstanceIds& instance_ids) override {
+      handle(instance_ids, &remove);
+    }
+  };
+
   virtual void SetUp() {
     TestFixture::SetUp();
     m_local_io_ctx.remove(RBD_MIRROR_LEADER);
     EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
   }
+
+  Listener m_listener;
 };
 
 TEST_F(TestInstances, InitShutdown)
 {
-  Instances<> instances(m_threads, m_local_io_ctx);
+  m_listener.add.count = 1;
+  Instances<> instances(m_threads, m_local_io_ctx, m_listener);
 
   std::string instance_id = "instance_id";
   ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
@@ -38,6 +76,9 @@ TEST_F(TestInstances, InitShutdown)
   instances.init(&on_init);
   ASSERT_EQ(0, on_init.wait());
 
+  ASSERT_EQ(0, m_listener.add.ctx.wait());
+  ASSERT_EQ(std::set<std::string>({instance_id}), m_listener.add.ids);
+
   C_SaferCond on_shut_down;
   instances.shut_down(&on_shut_down);
   ASSERT_EQ(0, on_shut_down.wait());
@@ -45,7 +86,7 @@ TEST_F(TestInstances, InitShutdown)
 
 TEST_F(TestInstances, InitEnoent)
 {
-  Instances<> instances(m_threads, m_local_io_ctx);
+  Instances<> instances(m_threads, m_local_io_ctx, m_listener);
 
   m_local_io_ctx.remove(RBD_MIRROR_LEADER);
 
@@ -64,35 +105,48 @@ TEST_F(TestInstances, NotifyRemove)
   EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1"));
   EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats",
                                 "2"));
+  EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_acquire_attempts_before_break",
+                                "0"));
 
-  Instances<> instances(m_threads, m_local_io_ctx);
+  m_listener.add.count = 2;
+  m_listener.remove.count = 1;
+  Instances<> instances(m_threads, m_local_io_ctx, m_listener);
 
   std::string instance_id1 = "instance_id1";
   std::string instance_id2 = "instance_id2";
 
   ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
                                                         instance_id1));
-  ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
-                                                        instance_id2));
+
 
   C_SaferCond on_init;
   instances.init(&on_init);
   ASSERT_EQ(0, on_init.wait());
 
-  std::vector<std::string> instance_ids;
+  instances.acked({instance_id1, instance_id2});
 
-  for (int i = 0; i < 10; i++) {
+  ASSERT_EQ(0, m_listener.add.ctx.wait());
+  ASSERT_EQ(std::set<std::string>({instance_id1, instance_id2}),
+            m_listener.add.ids);
+
+  std::vector<std::string> instance_ids;
+  for (int i = 0; i < 100; i++) {
     instances.acked({instance_id1});
-    sleep(1);
-    C_SaferCond on_get;
-    InstanceWatcher<>::get_instances(m_local_io_ctx, &instance_ids, &on_get);
-    EXPECT_EQ(0, on_get.wait());
-    if (instance_ids.size() <= 1U) {
-      break;
+    if (m_listener.remove.count > 0) {
+      usleep(250000);
     }
   }
 
-  ASSERT_EQ(1U, instance_ids.size());
+  instances.acked({instance_id1});
+  ASSERT_EQ(0, m_listener.remove.ctx.wait());
+  ASSERT_EQ(std::set<std::string>({instance_id2}),
+           m_listener.remove.ids);
+
+  C_SaferCond on_get;
+  instances.acked({instance_id1});
+  InstanceWatcher<>::get_instances(m_local_io_ctx, &instance_ids, &on_get);
+  EXPECT_EQ(0, on_get.wait());
+  EXPECT_EQ(1U, instance_ids.size());
   ASSERT_EQ(instance_ids[0], instance_id1);
 
   C_SaferCond on_shut_down;
index f39667d6dd6b1eb495865c4516d57d03c3f5b0c6..2eae5fd2d7bb0edae690cba7aee24f5b0a51d827 100644 (file)
@@ -212,7 +212,7 @@ struct Instances<librbd::MockTestImageCtx> {
   static Instances* s_instance;
 
   static Instances *create(Threads<librbd::MockTestImageCtx> *threads,
-                           librados::IoCtx &ioctx) {
+                           librados::IoCtx &ioctx, instances::Listener&) {
     assert(s_instance != nullptr);
     return s_instance;
   }
index 1c24d5d7a3bc798b7cb8a571f7ae605f9a3a33ca..bb8fa2c95daa39291a7b8beb2fe4c00f3743b359 100644 (file)
@@ -25,8 +25,9 @@ using librbd::util::create_context_callback;
 using librbd::util::create_rados_callback;
 
 template <typename I>
-Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx) :
-  m_threads(threads), m_ioctx(ioctx),
+Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx,
+                        instances::Listener& listener) :
+  m_threads(threads), m_ioctx(ioctx), m_listener(listener),
   m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
   m_lock("rbd::mirror::Instances " + ioctx.get_pool_name()) {
 }
@@ -89,14 +90,61 @@ void Instances<I>::handle_acked(const InstanceIds& instance_ids) {
     return;
   }
 
+  InstanceIds added_instance_ids;
   auto time = ceph_clock_now();
   for (auto& instance_id : instance_ids) {
     auto &instance = m_instances.insert(
       std::make_pair(instance_id, Instance{})).first->second;
     instance.acked_time = time;
+    if (instance.state == INSTANCE_STATE_ADDING) {
+      added_instance_ids.push_back(instance_id);
+    }
   }
 
   schedule_remove_task(time);
+  if (!added_instance_ids.empty()) {
+    m_threads->work_queue->queue(
+      new C_NotifyInstancesAdded(this, added_instance_ids), 0);
+  }
+}
+
+template <typename I>
+void Instances<I>::notify_instances_added(const InstanceIds& instance_ids) {
+  Mutex::Locker locker(m_lock);
+  InstanceIds added_instance_ids;
+  for (auto& instance_id : instance_ids) {
+    auto it = m_instances.find(instance_id);
+    if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
+      added_instance_ids.push_back(instance_id);
+    }
+  }
+
+  if (added_instance_ids.empty()) {
+    return;
+  }
+
+  dout(5) << "instance_ids=" << added_instance_ids << dendl;
+  m_lock.Unlock();
+  m_listener.handle_added(added_instance_ids);
+  m_lock.Lock();
+
+  for (auto& instance_id : added_instance_ids) {
+    auto it = m_instances.find(instance_id);
+    if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
+      it->second.state = INSTANCE_STATE_IDLE;
+    }
+  }
+}
+
+template <typename I>
+void Instances<I>::notify_instances_removed(const InstanceIds& instance_ids) {
+  dout(5) << "instance_ids=" << instance_ids << dendl;
+  m_listener.handle_removed(instance_ids);
+
+  Mutex::Locker locker(m_lock);
+  for (auto& instance_id : instance_ids) {
+    m_instances.erase(instance_id);
+  }
 }
 
 template <typename I>
@@ -181,6 +229,7 @@ void Instances<I>::remove_instances(const utime_t& time) {
       instance_ids.push_back(instance_pair.first);
     }
   }
+  assert(!instance_ids.empty());
 
   dout(20) << "instance_ids=" << instance_ids << dendl;
   Context* ctx = new FunctionContext([this, instance_ids](int r) {
@@ -207,9 +256,9 @@ void Instances<I>::handle_remove_instances(
   dout(20) << "r=" << r << ", instance_ids=" << instance_ids << dendl;
   assert(r == 0);
 
-  for (auto& instance_id : instance_ids) {
-    m_instances.erase(instance_id);
-  }
+  // fire removed notification now that instaces have been blacklisted
+  m_threads->work_queue->queue(
+    new C_NotifyInstancesRemoved(this, instance_ids), 0);
 
   // reschedule the timer for the next batch
   schedule_remove_task(ceph_clock_now());
@@ -240,7 +289,6 @@ void Instances<I>::schedule_remove_task(const utime_t& time) {
     return;
   }
 
-  dout(20) << dendl;
   int after = m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_heartbeat_interval") *
     (1 + m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_missed_heartbeats") +
      m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_acquire_attempts_before_break"));
@@ -249,6 +297,7 @@ void Instances<I>::schedule_remove_task(const utime_t& time) {
   utime_t oldest_time = time;
   for (auto& instance : m_instances) {
     if (instance.second.state == INSTANCE_STATE_REMOVING) {
+      // removal is already in-flight
       continue;
     }
 
@@ -260,6 +309,8 @@ void Instances<I>::schedule_remove_task(const utime_t& time) {
     return;
   }
 
+  dout(20) << dendl;
+
   // schedule a time to fire when the oldest instance should be removed
   m_timer_task = new FunctionContext(
     [this, oldest_time](int r) {
index 875e041744caa61ffb37f5e95033707ace89c1d0..827b2eedf293915b1e1e6e9f31e4a12464b0e364 100644 (file)
@@ -11,6 +11,7 @@
 #include "common/AsyncOpTracker.h"
 #include "common/Mutex.h"
 #include "librbd/Watcher.h"
+#include "tools/rbd_mirror/instances/Types.h"
 
 namespace librados { class IoCtx; }
 namespace librbd { class ImageCtx; }
@@ -26,14 +27,16 @@ public:
   typedef std::vector<std::string> InstanceIds;
 
   static Instances *create(Threads<ImageCtxT> *threads,
-                           librados::IoCtx &ioctx) {
-    return new Instances(threads, ioctx);
+                           librados::IoCtx &ioctx,
+                           instances::Listener& listener) {
+    return new Instances(threads, ioctx, listener);
   }
   void destroy() {
     delete this;
   }
 
-  Instances(Threads<ImageCtxT> *threads, librados::IoCtx &ioctx);
+  Instances(Threads<ImageCtxT> *threads, librados::IoCtx &ioctx,
+            instances::Listener& listener);
   virtual ~Instances();
 
   void init(Context *on_finish);
@@ -63,13 +66,14 @@ private:
    */
 
   enum InstanceState {
+    INSTANCE_STATE_ADDING,
     INSTANCE_STATE_IDLE,
     INSTANCE_STATE_REMOVING
   };
 
   struct Instance {
     utime_t acked_time{};
-    InstanceState state = INSTANCE_STATE_IDLE;
+    InstanceState state = INSTANCE_STATE_ADDING;
   };
 
   struct C_NotifyBase : public Context {
@@ -99,8 +103,31 @@ private:
     }
   };
 
+  struct C_NotifyInstancesAdded : public C_NotifyBase {
+    C_NotifyInstancesAdded(Instances *instances,
+                           const InstanceIds& instance_ids)
+      : C_NotifyBase(instances, instance_ids) {
+    }
+
+    void execute() override {
+      this->instances->notify_instances_added(this->instance_ids);
+    }
+  };
+
+  struct C_NotifyInstancesRemoved : public C_NotifyBase {
+    C_NotifyInstancesRemoved(Instances *instances,
+                            const InstanceIds& instance_ids)
+      : C_NotifyBase(instances, instance_ids) {
+    }
+
+    void execute() override {
+      this->instances->notify_instances_removed(this->instance_ids);
+    }
+  };
+
   Threads<ImageCtxT> *m_threads;
   librados::IoCtx &m_ioctx;
+  instances::Listener& m_listener;
   CephContext *m_cct;
 
   Mutex m_lock;
@@ -112,6 +139,8 @@ private:
   Context *m_timer_task = nullptr;
 
   void handle_acked(const InstanceIds& instance_ids);
+  void notify_instances_added(const InstanceIds& instance_ids);
+  void notify_instances_removed(const InstanceIds& instance_ids);
 
   void get_instances();
   void handle_get_instances(int r);
index 1b2867e96273adb0b2c9882add8ffc13d6a3c72d..01e997f8e8cf2e95ead265e916be3c6c9f63dc6a 100644 (file)
@@ -29,7 +29,7 @@ template <typename I>
 LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
                                 leader_watcher::Listener *listener)
   : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
-    m_threads(threads), m_listener(listener),
+    m_threads(threads), m_listener(listener), m_instances_listener(this),
     m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
     m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
     m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
@@ -765,7 +765,7 @@ void LeaderWatcher<I>::init_instances() {
   assert(m_lock.is_locked());
   assert(m_instances == nullptr);
 
-  m_instances = Instances<I>::create(m_threads, m_ioctx);
+  m_instances = Instances<I>::create(m_threads, m_ioctx, m_instances_listener);
 
   Context *ctx = create_context_callback<
     LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
index 922c76e2291747cd459cc32d64784f20ae3c2c0a..4f1e12127bdd3d80eadbbf44e906d6751da20bbf 100644 (file)
@@ -15,6 +15,7 @@
 #include "librbd/watcher/Types.h"
 #include "Instances.h"
 #include "MirrorStatusWatcher.h"
+#include "tools/rbd_mirror/instances/Types.h"
 #include "tools/rbd_mirror/leader_watcher/Types.h"
 
 namespace librbd { class ImageCtx; }
@@ -92,6 +93,22 @@ private:
    * @endverbatim
    */
 
+  struct InstancesListener : public instances::Listener {
+    LeaderWatcher* leader_watcher;
+
+    InstancesListener(LeaderWatcher* leader_watcher)
+      : leader_watcher(leader_watcher) {
+    }
+
+    void handle_added(const InstanceIds& instance_ids) override {
+      // TODO
+    }
+
+    void handle_removed(const InstanceIds& instance_ids) override {
+      // TODO
+    }
+  };
+
   class LeaderLock : public librbd::ManagedLock<ImageCtxT> {
   public:
     typedef librbd::ManagedLock<ImageCtxT> Parent;
@@ -184,6 +201,7 @@ private:
   Threads<ImageCtxT> *m_threads;
   leader_watcher::Listener *m_listener;
 
+  InstancesListener m_instances_listener;
   mutable Mutex m_lock;
   uint64_t m_notifier_id;
   LeaderLock *m_leader_lock;
diff --git a/src/tools/rbd_mirror/instances/Types.h b/src/tools/rbd_mirror/instances/Types.h
new file mode 100644 (file)
index 0000000..8b0a68f
--- /dev/null
@@ -0,0 +1,28 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_INSTANCES_TYPES_H
+#define CEPH_RBD_MIRROR_INSTANCES_TYPES_H
+
+#include <string>
+#include <vector>
+
+namespace rbd {
+namespace mirror {
+namespace instances {
+
+struct Listener {
+  typedef std::vector<std::string> InstanceIds;
+
+  virtual ~Listener() {
+  }
+
+  virtual void handle_added(const InstanceIds& instance_ids) = 0;
+  virtual void handle_removed(const InstanceIds& instance_ids) = 0;
+};
+
+} // namespace instances
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_INSTANCES_TYPES_H