]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: batch peer instances ack and timeout handlers
authorJason Dillaman <dillaman@redhat.com>
Fri, 9 Mar 2018 04:18:35 +0000 (23:18 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 10 Apr 2018 20:31:32 +0000 (16:31 -0400)
This throttles the on-disk updates and also will eventually help to
throttle the shuffling of images between alive peer instances.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_Instances.cc
src/test/rbd_mirror/test_mock_LeaderWatcher.cc
src/tools/rbd_mirror/Instances.cc
src/tools/rbd_mirror/Instances.h
src/tools/rbd_mirror/LeaderWatcher.cc

index e2f8008e00eefad427c7eba6f46b4a4533d61e8a..eaf86d25f2b35ee018955a60129a8efc925d072e 100644 (file)
@@ -82,7 +82,7 @@ TEST_F(TestInstances, NotifyRemove)
   std::vector<std::string> instance_ids;
 
   for (int i = 0; i < 10; i++) {
-    instances.notify(instance_id1);
+    instances.acked({instance_id1});
     sleep(1);
     C_SaferCond on_get;
     InstanceWatcher<>::get_instances(m_local_io_ctx, &instance_ids, &on_get);
index a4ee8fecd4b4141d32f2b64c53620cad2fdad41e..f39667d6dd6b1eb495865c4516d57d03c3f5b0c6 100644 (file)
@@ -230,7 +230,7 @@ struct Instances<librbd::MockTestImageCtx> {
   MOCK_METHOD0(destroy, void());
   MOCK_METHOD1(init, void(Context *));
   MOCK_METHOD1(shut_down, void(Context *));
-  MOCK_METHOD1(notify, void(const std::string &));
+  MOCK_METHOD1(acked, void(const std::vector<std::string> &));
 };
 
 Instances<librbd::MockTestImageCtx> *Instances<librbd::MockTestImageCtx>::s_instance = nullptr;
index 56feb760ae3fb6d63df431c188b2dcc63f7cb0d8..1c24d5d7a3bc798b7cb8a571f7ae605f9a3a33ca 100644 (file)
@@ -57,10 +57,7 @@ void Instances<I>::shut_down(Context *on_finish) {
     [this](int r) {
       Mutex::Locker timer_locker(m_threads->timer_lock);
       Mutex::Locker locker(m_lock);
-
-      for (auto it : m_instances) {
-        cancel_remove_task(it.second);
-      }
+      cancel_remove_task();
       wait_for_ops();
     });
 
@@ -68,37 +65,38 @@ void Instances<I>::shut_down(Context *on_finish) {
 }
 
 template <typename I>
-void Instances<I>::notify(const std::string &instance_id) {
-  dout(20) << instance_id << dendl;
+void Instances<I>::acked(const InstanceIds& instance_ids) {
+  dout(20) << "instance_ids=" << instance_ids << dendl;
 
   Mutex::Locker locker(m_lock);
-
   if (m_on_finish != nullptr) {
     dout(20) << "received on shut down, ignoring" << dendl;
     return;
   }
 
-  Context *ctx = new C_Notify(this, instance_id);
-
+  Context *ctx = new C_HandleAcked(this, instance_ids);
   m_threads->work_queue->queue(ctx, 0);
 }
 
 template <typename I>
-void Instances<I>::handle_notify(const std::string &instance_id) {
-  dout(20) << instance_id << dendl;
+void Instances<I>::handle_acked(const InstanceIds& instance_ids) {
+  dout(5) << "instance_ids=" << instance_ids << dendl;
 
   Mutex::Locker timer_locker(m_threads->timer_lock);
   Mutex::Locker locker(m_lock);
-
   if (m_on_finish != nullptr) {
     dout(20) << "handled on shut down, ignoring" << dendl;
     return;
   }
 
-  auto &instance = m_instances.insert(
-    std::make_pair(instance_id, Instance(instance_id))).first->second;
+  auto time = ceph_clock_now();
+  for (auto& instance_id : instance_ids) {
+    auto &instance = m_instances.insert(
+      std::make_pair(instance_id, Instance{})).first->second;
+    instance.acked_time = time;
+  }
 
-  schedule_remove_task(instance);
+  schedule_remove_task(time);
 }
 
 template <typename I>
@@ -131,24 +129,15 @@ void Instances<I>::handle_get_instances(int r) {
 
   Context *on_finish = nullptr;
   {
-    Mutex::Locker timer_locker(m_threads->timer_lock);
     Mutex::Locker locker(m_lock);
-
-    if (r < 0) {
-      derr << "error retrieving instances: " << cpp_strerror(r) << dendl;
-    } else {
-      auto my_instance_id = stringify(m_ioctx.get_instance_id());
-      for (auto &instance_id : m_instance_ids) {
-        if (instance_id == my_instance_id) {
-          continue;
-        }
-        auto &instance = m_instances.insert(
-          std::make_pair(instance_id, Instance(instance_id))).first->second;
-        schedule_remove_task(instance);
-      }
-    }
     std::swap(on_finish, m_on_finish);
   }
+
+  if (r < 0) {
+    derr << "error retrieving instances: " << cpp_strerror(r) << dendl;
+  } else {
+    handle_acked(m_instance_ids);
+  }
   on_finish->complete(r);
 }
 
@@ -180,70 +169,109 @@ void Instances<I>::handle_wait_for_ops(int r) {
 }
 
 template <typename I>
-void Instances<I>::remove_instance(Instance &instance) {
+void Instances<I>::remove_instances(const utime_t& time) {
   assert(m_lock.is_locked());
 
-  dout(20) << instance.id << dendl;
+  InstanceIds instance_ids;
+  for (auto& instance_pair : m_instances) {
+    auto& instance = instance_pair.second;
+    if (instance.state != INSTANCE_STATE_REMOVING &&
+        instance.acked_time <= time) {
+      instance.state = INSTANCE_STATE_REMOVING;
+      instance_ids.push_back(instance_pair.first);
+    }
+  }
 
-  Context *ctx = create_async_context_callback(
-    m_threads->work_queue, create_context_callback<
-    Instances, &Instances<I>::handle_remove_instance>(this));
+  dout(20) << "instance_ids=" << instance_ids << dendl;
+  Context* ctx = new FunctionContext([this, instance_ids](int r) {
+      handle_remove_instances(r, instance_ids);
+    });
+  ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+  auto gather_ctx = new C_Gather(m_cct, ctx);
+  for (auto& instance_id : instance_ids) {
+    InstanceWatcher<I>::remove_instance(m_ioctx, m_threads->work_queue,
+                                        instance_id, gather_ctx->new_sub());
+  }
 
   m_async_op_tracker.start_op();
-  InstanceWatcher<I>::remove_instance(m_ioctx, m_threads->work_queue,
-                                      instance.id, ctx);
-  m_instances.erase(instance.id);
+  gather_ctx->activate();
 }
 
 template <typename I>
-void Instances<I>::handle_remove_instance(int r) {
+void Instances<I>::handle_remove_instances(
+    int r, const InstanceIds& instance_ids) {
+  Mutex::Locker timer_locker(m_threads->timer_lock);
   Mutex::Locker locker(m_lock);
 
-  dout(20) << " r=" << r << dendl;
-
+  dout(20) << "r=" << r << ", instance_ids=" << instance_ids << dendl;
   assert(r == 0);
 
+  for (auto& instance_id : instance_ids) {
+    m_instances.erase(instance_id);
+  }
+
+  // reschedule the timer for the next batch
+  schedule_remove_task(ceph_clock_now());
   m_async_op_tracker.finish_op();
 }
 
 template <typename I>
-void Instances<I>::cancel_remove_task(Instance &instance) {
+void Instances<I>::cancel_remove_task() {
   assert(m_threads->timer_lock.is_locked());
   assert(m_lock.is_locked());
 
-  if (instance.timer_task == nullptr) {
+  if (m_timer_task == nullptr) {
     return;
   }
 
-  dout(20) << instance.timer_task << dendl;
+  dout(20) << dendl;
 
-  bool canceled = m_threads->timer->cancel_event(instance.timer_task);
+  bool canceled = m_threads->timer->cancel_event(m_timer_task);
   assert(canceled);
-  instance.timer_task = nullptr;
+  m_timer_task = nullptr;
 }
 
 template <typename I>
-void Instances<I>::schedule_remove_task(Instance &instance) {
-  dout(20) << dendl;
-
-  cancel_remove_task(instance);
+void Instances<I>::schedule_remove_task(const utime_t& time) {
+  cancel_remove_task();
+  if (m_on_finish != nullptr) {
+    dout(20) << "received on shut down, ignoring" << dendl;
+    return;
+  }
 
+  dout(20) << dendl;
   int after = m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_heartbeat_interval") *
     (1 + m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_missed_heartbeats") +
      m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_acquire_attempts_before_break"));
 
-  instance.timer_task = new FunctionContext(
-    [this, &instance](int r) {
+  bool schedule = false;
+  utime_t oldest_time = time;
+  for (auto& instance : m_instances) {
+    if (instance.second.state == INSTANCE_STATE_REMOVING) {
+      continue;
+    }
+
+    oldest_time = std::min(oldest_time, instance.second.acked_time);
+    schedule = true;
+  }
+
+  if (!schedule) {
+    return;
+  }
+
+  // schedule a time to fire when the oldest instance should be removed
+  m_timer_task = new FunctionContext(
+    [this, oldest_time](int r) {
       assert(m_threads->timer_lock.is_locked());
       Mutex::Locker locker(m_lock);
-      instance.timer_task = nullptr;
-      remove_instance(instance);
-    });
+      m_timer_task = nullptr;
 
-  dout(20) << "scheduling instance " << instance.id << " remove after " << after
-           << " sec (task " << instance.timer_task << ")" << dendl;
+      remove_instances(oldest_time);
+    });
 
-  m_threads->timer->add_event_after(after, instance.timer_task);
+  oldest_time += after;
+  m_threads->timer->add_event_at(oldest_time, m_timer_task);
 }
 
 } // namespace mirror
index 2aa4bcf721db3e8cb4768e7ffda72be3cc2219c7..875e041744caa61ffb37f5e95033707ace89c1d0 100644 (file)
@@ -23,6 +23,8 @@ template <typename> struct Threads;
 template <typename ImageCtxT = librbd::ImageCtx>
 class Instances {
 public:
+  typedef std::vector<std::string> InstanceIds;
+
   static Instances *create(Threads<ImageCtxT> *threads,
                            librados::IoCtx &ioctx) {
     return new Instances(threads, ioctx);
@@ -37,7 +39,8 @@ public:
   void init(Context *on_finish);
   void shut_down(Context *on_finish);
 
-  void notify(const std::string &instance_id);
+  void acked(const InstanceIds& instance_ids);
+
   void list(std::vector<std::string> *instance_ids);
 
 private:
@@ -59,27 +62,41 @@ private:
    * @endverbatim
    */
 
-  struct Instance {
-    std::string id;
-    Context *timer_task = nullptr;
+  enum InstanceState {
+    INSTANCE_STATE_IDLE,
+    INSTANCE_STATE_REMOVING
+  };
 
-    Instance(const std::string &instance_id) : id(instance_id) {
-    }
+  struct Instance {
+    utime_t acked_time{};
+    InstanceState state = INSTANCE_STATE_IDLE;
   };
 
-  struct C_Notify : Context {
+  struct C_NotifyBase : public Context {
     Instances *instances;
-    std::string instance_id;
+    InstanceIds instance_ids;
 
-    C_Notify(Instances *instances, const std::string &instance_id)
-      : instances(instances), instance_id(instance_id) {
+    C_NotifyBase(Instances *instances, const InstanceIds& instance_ids)
+      : instances(instances), instance_ids(instance_ids) {
       instances->m_async_op_tracker.start_op();
     }
 
     void finish(int r) override {
-      instances->handle_notify(instance_id);
+      execute();
       instances->m_async_op_tracker.finish_op();
     }
+
+    virtual void execute() = 0;
+  };
+
+  struct C_HandleAcked : public C_NotifyBase {
+    C_HandleAcked(Instances *instances, const InstanceIds& instance_ids)
+      : C_NotifyBase(instances, instance_ids) {
+    }
+
+    void execute() override {
+      this->instances->handle_acked(this->instance_ids);
+    }
   };
 
   Threads<ImageCtxT> *m_threads;
@@ -87,12 +104,14 @@ private:
   CephContext *m_cct;
 
   Mutex m_lock;
-  std::vector<std::string> m_instance_ids;
+  InstanceIds m_instance_ids;
   std::map<std::string, Instance> m_instances;
   Context *m_on_finish = nullptr;
   AsyncOpTracker m_async_op_tracker;
 
-  void handle_notify(const std::string &instance_id);
+  Context *m_timer_task = nullptr;
+
+  void handle_acked(const InstanceIds& instance_ids);
 
   void get_instances();
   void handle_get_instances(int r);
@@ -100,11 +119,11 @@ private:
   void wait_for_ops();
   void handle_wait_for_ops(int r);
 
-  void remove_instance(Instance &instance);
-  void handle_remove_instance(int r);
+  void remove_instances(const utime_t& time);
+  void handle_remove_instances(int r, const InstanceIds& instance_ids);
 
-  void cancel_remove_task(Instance &instance);
-  void schedule_remove_task(Instance &instance);
+  void cancel_remove_task();
+  void schedule_remove_task(const utime_t& time);
 };
 
 } // namespace mirror
index a376cc1a6b336510fe1591ec5741a5647e593827..1b2867e96273adb0b2c9882add8ffc13d6a3c72d 100644 (file)
@@ -986,14 +986,17 @@ void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
   dout(20) << m_heartbeat_response.acks.size() << " acks received, "
            << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
 
+  std::vector<std::string> instance_ids;
   for (auto &it: m_heartbeat_response.acks) {
     uint64_t notifier_id = it.first.gid;
     if (notifier_id == m_notifier_id) {
       continue;
     }
 
-    std::string instance_id = stringify(notifier_id);
-    m_instances->notify(instance_id);
+    instance_ids.push_back(stringify(notifier_id));
+  }
+  if (!instance_ids.empty()) {
+    m_instances->acked(instance_ids);
   }
 
   schedule_timer_task("heartbeat", 1, true,