]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: separate image replayer periodic flushing from status updates
authorJason Dillaman <dillaman@redhat.com>
Mon, 9 Sep 2019 14:15:24 +0000 (10:15 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Oct 2019 15:16:46 +0000 (11:16 -0400)
This is in preparation for pulling the periodic status update logic out of
the image replayer.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Utils.h
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index 9aa01edb2dff30a1e617140108bb239edbc208d6..4695a10e60861fcd747acfa524c79e62e929334a 100644 (file)
@@ -89,8 +89,12 @@ struct C_AsyncCallback : public Context {
   C_AsyncCallback(WQ *op_work_queue, Context *on_finish)
     : op_work_queue(op_work_queue), on_finish(on_finish) {
   }
+  ~C_AsyncCallback() override {
+    delete on_finish;
+  }
   void finish(int r) override {
     op_work_queue->queue(on_finish, r);
+    on_finish = nullptr;
   }
 };
 
index f11cf783783a22f0cac3bce88d5ce9b4f412a970..3663949fcff6c519025d5796d3eaa9eaec4f56ee 100644 (file)
@@ -48,6 +48,7 @@ extern PerfCounters *g_perf_counters;
 namespace rbd {
 namespace mirror {
 
+using librbd::util::create_async_context_callback;
 using librbd::util::create_context_callback;
 using librbd::util::create_rados_callback;
 using namespace rbd::mirror::image_replayer;
@@ -288,6 +289,7 @@ ImageReplayer<I>::~ImageReplayer()
   ceph_assert(m_on_stop_finish == nullptr);
   ceph_assert(m_bootstrap_request == nullptr);
   ceph_assert(m_in_flight_status_updates == 0);
+  ceph_assert(m_flush_local_replay_task == nullptr);
 
   delete m_journal_listener;
 }
@@ -770,8 +772,10 @@ void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
       // might be invoked multiple times while stopping
       return;
     }
+
     m_stop_requested = true;
     m_state = STATE_STOPPING;
+    cancel_flush_local_replay_task();
   }
 
   set_state_description(r, desc);
@@ -836,6 +840,50 @@ void ImageReplayer<I>::flush()
   update_mirror_image_status(false, boost::none);
 }
 
+template <typename I>
+void ImageReplayer<I>::schedule_flush_local_replay_task() {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  std::lock_guard timer_locker{m_threads->timer_lock};
+  if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
+    return;
+  }
+
+  dout(15) << dendl;
+  m_flush_local_replay_task = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<
+      ImageReplayer<I>,
+      &ImageReplayer<I>::handle_flush_local_replay_task>(this));
+  m_threads->timer->add_event_after(30, m_flush_local_replay_task);
+}
+
+template <typename I>
+void ImageReplayer<I>::cancel_flush_local_replay_task() {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+  std::lock_guard timer_locker{m_threads->timer_lock};
+  if (m_flush_local_replay_task != nullptr) {
+    auto canceled = m_threads->timer->cancel_event(m_flush_local_replay_task);
+    m_flush_local_replay_task = nullptr;
+    ceph_assert(canceled);
+  }
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_flush_local_replay_task(int) {
+  dout(15) << dendl;
+
+  m_in_flight_op_tracker.start_op();
+  auto on_finish = new LambdaContext([this](int) {
+      {
+        std::lock_guard timer_locker{m_threads->timer_lock};
+        m_flush_local_replay_task = nullptr;
+      }
+
+      m_in_flight_op_tracker.finish_op();
+    });
+  flush_local_replay(on_finish);
+}
+
 template <typename I>
 void ImageReplayer<I>::flush_local_replay(Context* on_flush)
 {
@@ -1238,6 +1286,8 @@ void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry
   auto ctx = new LambdaContext(
     [this, bytes, latency](int r) {
       std::lock_guard locker{m_lock};
+      schedule_flush_local_replay_task();
+
       if (m_perf_counters) {
         m_perf_counters->inc(l_rbd_mirror_replay);
         m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
@@ -1315,14 +1365,7 @@ void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &sta
     [this, state](int r) {
       send_mirror_status_update(state);
     });
-
-  // ensure pending IO is flushed and the commit position is updated
-  // prior to updating the mirror status
-  auto ctx2 = new LambdaContext(
-    [this, ctx=std::move(ctx)](int r) {
-      flush_local_replay(ctx);
-    });
-  m_threads->work_queue->queue(ctx2, 0);
+  m_threads->work_queue->queue(ctx, 0);
 }
 
 template <typename I>
@@ -1700,6 +1743,14 @@ void ImageReplayer<I>::handle_shut_down(int r) {
     return;
   }
 
+  if (!m_in_flight_op_tracker.empty()) {
+    dout(15) << "waiting for in-flight operations to complete" << dendl;
+    m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
+        handle_shut_down(r);
+      }));
+    return;
+  }
+
   dout(10) << "stop complete" << dendl;
   ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
   m_replay_status_formatter = nullptr;
index 33bd8e811ee994a37ed55294d5b91b7ea0d75f83..22507de162fd98265a4a11c1737dfa239d59d8ae 100644 (file)
@@ -341,6 +341,9 @@ private:
   AsyncOpTracker m_event_replay_tracker;
   Context *m_delayed_preprocess_task = nullptr;
 
+  AsyncOpTracker m_in_flight_op_tracker;
+  Context *m_flush_local_replay_task = nullptr;
+
   struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
     ImageReplayer *replayer;
 
@@ -378,6 +381,10 @@ private:
             m_state == STATE_REPLAY_FLUSHING);
   }
 
+  void schedule_flush_local_replay_task();
+  void cancel_flush_local_replay_task();
+  void handle_flush_local_replay_task(int r);
+
   void flush_local_replay(Context* on_flush);
   void handle_flush_local_replay(Context* on_flush, int r);