]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: unify journal event callbacks into single interface
authorJason Dillaman <dillaman@redhat.com>
Tue, 13 Sep 2016 16:37:53 +0000 (12:37 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 11 Oct 2016 16:51:09 +0000 (12:51 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit dbbcecf4a289ca36b734b7bda9530cc0a59f84ac)

src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/journal/Types.h
src/test/librbd/mock/MockJournal.h
src/test/librbd/test_mock_Journal.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index a0f7285c2aff04f2547e8e8afdcd9834187d3121..745f2f6a991a3fc8cd58840a3cdf5252994791f2 100644 (file)
@@ -335,7 +335,6 @@ Journal<I>::~Journal() {
   assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
   assert(m_journaler == NULL);
   assert(m_journal_replay == NULL);
-  assert(m_on_replay_close_request == nullptr);
   assert(m_wait_for_state_contexts.empty());
 }
 
@@ -632,6 +631,12 @@ bool Journal<I>::is_journal_ready() const {
 template <typename I>
 bool Journal<I>::is_journal_replaying() const {
   Mutex::Locker locker(m_lock);
+  return is_journal_replaying(m_lock);
+}
+
+template <typename I>
+bool Journal<I>::is_journal_replaying(const Mutex &) const {
+  assert(m_lock.is_locked());
   return (m_state == STATE_REPLAYING ||
           m_state == STATE_FLUSHING_REPLAY ||
           m_state == STATE_FLUSHING_RESTART ||
@@ -679,6 +684,21 @@ void Journal<I>::close(Context *on_finish) {
   on_finish = create_async_context_callback(m_image_ctx, on_finish);
 
   Mutex::Locker locker(m_lock);
+  while (m_listener_notify) {
+    m_listener_cond.Wait(m_lock);
+  }
+
+  Listeners listeners(m_listeners);
+  m_listener_notify = true;
+  m_lock.Unlock();
+  for (auto listener : listeners) {
+    listener->handle_close();
+  }
+
+  m_lock.Lock();
+  m_listener_notify = false;
+  m_listener_cond.Signal();
+
   assert(m_state != STATE_UNINITIALIZED);
   if (m_state == STATE_CLOSED) {
     on_finish->complete(m_error_result);
@@ -689,12 +709,6 @@ void Journal<I>::close(Context *on_finish) {
     stop_recording();
   }
 
-  // interrupt external replay if active
-  if (m_on_replay_close_request != nullptr) {
-    m_on_replay_close_request->complete(0);
-    m_on_replay_close_request = nullptr;
-  }
-
   m_close_pending = true;
   wait_for_steady_state(on_finish);
 }
@@ -1127,16 +1141,13 @@ typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
 
 template <typename I>
 void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
-                                       Context *on_start,
-                                       Context *on_close_request) {
+                                       Context *on_start) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
   Mutex::Locker locker(m_lock);
   assert(m_state == STATE_READY);
   assert(m_journal_replay == nullptr);
-  assert(m_on_replay_close_request == nullptr);
-  m_on_replay_close_request = on_close_request;
 
   on_start = util::create_async_context_callback(m_image_ctx, on_start);
   on_start = new FunctionContext(
@@ -1165,11 +1176,6 @@ void Journal<I>::handle_start_external_replay(int r,
                << "failed to stop recording: " << cpp_strerror(r) << dendl;
     *journal_replay = nullptr;
 
-    if (m_on_replay_close_request != nullptr) {
-      m_on_replay_close_request->complete(r);
-      m_on_replay_close_request = nullptr;
-    }
-
     // get back to a sane-state
     start_append();
     on_finish->complete(r);
@@ -1184,15 +1190,13 @@ void Journal<I>::handle_start_external_replay(int r,
 
 template <typename I>
 void Journal<I>::stop_external_replay() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
   Mutex::Locker locker(m_lock);
   assert(m_journal_replay != nullptr);
   assert(m_state == STATE_REPLAYING);
 
-  if (m_on_replay_close_request != nullptr) {
-    m_on_replay_close_request->complete(-ECANCELED);
-    m_on_replay_close_request = nullptr;
-  }
-
   delete m_journal_replay;
   m_journal_replay = nullptr;
 
@@ -1762,13 +1766,13 @@ void Journal<I>::wait_for_steady_state(Context *on_state) {
 }
 
 template <typename I>
-int Journal<I>::check_resync_requested(bool *do_resync) {
+int Journal<I>::is_resync_requested(bool *do_resync) {
   Mutex::Locker l(m_lock);
-  return check_resync_requested_internal(do_resync);
+  return check_resync_requested(do_resync);
 }
 
 template <typename I>
-int Journal<I>::check_resync_requested_internal(bool *do_resync) {
+int Journal<I>::check_resync_requested(bool *do_resync) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
@@ -1811,51 +1815,51 @@ void Journal<I>::handle_metadata_updated() {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  std::list<journal::ResyncListener *> resync_private_list;
+  Mutex::Locker locker(m_lock);
+  if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
+    return;
+  }
 
-  {
-    Mutex::Locker l(m_lock);
+  while (m_listener_notify) {
+    m_listener_cond.Wait(m_lock);
+  }
 
-    if (m_state == STATE_CLOSING || m_state == STATE_CLOSED ||
-        m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPING) {
-      return;
-    }
+  bool resync_requested = false;
+  int r = check_resync_requested(&resync_requested);
+  if (r < 0) {
+    lderr(cct) << this << " " << __func__ << ": "
+               << "failed to check if a resync was requested" << dendl;
+    return;
+  }
 
-    bool do_resync = false;
-    int r = check_resync_requested_internal(&do_resync);
-    if (r < 0) {
-      lderr(cct) << this << " " << __func__ << ": "
-                 << "failed to check if a resync was requested" << dendl;
-      return;
-    }
+  Listeners listeners(m_listeners);
+  m_listener_notify = true;
+  m_lock.Unlock();
 
-    if (do_resync) {
-      for (const auto& listener :
-                              m_listener_map[journal::ListenerType::RESYNC]) {
-        journal::ResyncListener *rsync_listener =
-                        boost::get<journal::ResyncListener *>(listener);
-        resync_private_list.push_back(rsync_listener);
-      }
+  if (resync_requested) {
+    for (auto listener : listeners) {
+      listener->handle_resync();
     }
   }
 
-  for (const auto& listener : resync_private_list) {
-    listener->handle_resync();
-  }
+  m_lock.Lock();
+  m_listener_notify = false;
+  m_listener_cond.Signal();
 }
 
 template <typename I>
-void Journal<I>::add_listener(journal::ListenerType type,
-                              journal::JournalListenerPtr listener) {
-  Mutex::Locker l(m_lock);
-  m_listener_map[type].push_back(listener);
+void Journal<I>::add_listener(journal::Listener *listener) {
+  Mutex::Locker locker(m_lock);
+  m_listeners.insert(listener);
 }
 
 template <typename I>
-void Journal<I>::remove_listener(journal::ListenerType type,
-                                 journal::JournalListenerPtr listener) {
-  Mutex::Locker l(m_lock);
-  m_listener_map[type].remove(listener);
+void Journal<I>::remove_listener(journal::Listener *listener) {
+  Mutex::Locker locker(m_lock);
+  while (m_listener_notify) {
+    m_listener_cond.Wait(m_lock);
+  }
+  m_listeners.erase(listener);
 }
 
 } // namespace librbd
index 1680587224d386fe546992c460df63dbeaff6296..a7a51fbd46d0bf41811e2948a0809d04e3792efa 100644 (file)
@@ -8,6 +8,7 @@
 #include "include/atomic.h"
 #include "include/Context.h"
 #include "include/interval_set.h"
+#include "common/Cond.h"
 #include "common/Mutex.h"
 #include "journal/Future.h"
 #include "journal/JournalMetadataListener.h"
@@ -157,15 +158,13 @@ public:
   }
 
   void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
-                             Context *on_start, Context *on_close_request);
+                             Context *on_start);
   void stop_external_replay();
 
-  void add_listener(journal::ListenerType type,
-                    journal::JournalListenerPtr listener);
-  void remove_listener(journal::ListenerType type,
-                       journal::JournalListenerPtr listener);
+  void add_listener(journal::Listener *listener);
+  void remove_listener(journal::Listener *listener);
 
-  int check_resync_requested(bool *do_resync);
+  int is_resync_requested(bool *do_resync);
 
 private:
   ImageCtxT &m_image_ctx;
@@ -297,7 +296,6 @@ private:
   bool m_blocking_writes;
 
   journal::Replay<ImageCtxT> *m_journal_replay;
-  Context *m_on_replay_close_request = nullptr;
 
   struct MetadataListener : public ::journal::JournalMetadataListener {
     Journal<ImageCtxT> *journal;
@@ -312,9 +310,12 @@ private:
     }
   } m_metadata_listener;
 
-  typedef std::map<journal::ListenerType,
-                   std::list<journal::JournalListenerPtr> > ListenerMap;
-  ListenerMap m_listener_map;
+  typedef std::set<journal::Listener *> Listeners;
+  Listeners m_listeners;
+  Cond m_listener_cond;
+  bool m_listener_notify = false;
+
+  bool is_journal_replaying(const Mutex &) const;
 
   uint64_t append_io_events(journal::EventType event_type,
                             const Bufferlists &bufferlists,
@@ -360,7 +361,7 @@ private:
   bool is_steady_state() const;
   void wait_for_steady_state(Context *on_state);
 
-  int check_resync_requested_internal(bool *do_resync);
+  int check_resync_requested(bool *do_resync);
 
   void handle_metadata_updated();
 };
index 7368b8b33f204ff9fcfa6e3ae601eca2ff910e4c..c8c13a9beeaed959dac1d43e2f75a6f092b3376b 100644 (file)
@@ -508,17 +508,19 @@ std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta);
 std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor);
 std::ostream &operator<<(std::ostream &out, const TagData &tag_data);
 
-enum class ListenerType : int8_t {
-  RESYNC
-};
+struct Listener {
+  virtual ~Listener() {
+  }
 
-struct ResyncListener {
-  virtual ~ResyncListener() {}
-  virtual void handle_resync() = 0;
-};
+  /// invoked when journal close is requested
+  virtual void handle_close() = 0;
 
-typedef boost::variant<ResyncListener *> JournalListenerPtr;
+  /// invoked when journal is promoted to primary
+  virtual void handle_promoted() = 0;
 
+  /// invoked when journal resync is requested
+  virtual void handle_resync() = 0;
+};
 
 } // namespace journal
 } // namespace librbd
index 0cd86393c818d1bd7af3360e4458f8cfbde16e4e..56b32804c41b4ea50178040316611916b0356d21 100644 (file)
@@ -81,12 +81,10 @@ struct MockJournal {
   MOCK_METHOD3(commit_op_event, void(uint64_t, int, Context *));
   MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *));
 
-  MOCK_METHOD2(add_listener, void(journal::ListenerType,
-                                  journal::JournalListenerPtr));
-  MOCK_METHOD2(remove_listener, void(journal::ListenerType,
-                                     journal::JournalListenerPtr));
+  MOCK_METHOD1(add_listener, void(journal::Listener *));
+  MOCK_METHOD1(remove_listener, void(journal::Listener *));
 
-  MOCK_METHOD1(check_resync_requested, int(bool *));
+  MOCK_METHOD1(is_resync_requested, int(bool *));
 };
 
 } // namespace librbd
index 3864cdbfec211719d611d54900bfbed3aa6736e5..70a025a8e204c18990ae78786255900a92f3f732 100644 (file)
@@ -170,12 +170,16 @@ public:
   }
 
   void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) {
-
     journal::ImageClientMeta image_client_meta;
     image_client_meta.tag_class = 0;
+    expect_get_journaler_cached_client(mock_journaler, image_client_meta, r);
+  }
 
+  void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler,
+                                          const journal::ImageClientMeta &client_meta,
+                                          int r) {
     journal::ClientData client_data;
-    client_data.client_meta = image_client_meta;
+    client_data.client_meta = client_meta;
 
     cls::journal::Client client;
     ::encode(client_data, client.data);
@@ -197,7 +201,8 @@ public:
     EXPECT_CALL(mock_journaler, get_tags(0, _, _))
                   .WillOnce(DoAll(SetArgPointee<1>(tags),
                                   WithArg<2>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue))));
-    EXPECT_CALL(mock_journaler, add_listener(_));
+    EXPECT_CALL(mock_journaler, add_listener(_))
+                  .WillOnce(SaveArg<0>(&m_listener));
   }
 
   void expect_start_replay(MockJournalImageCtx &mock_image_ctx,
@@ -393,6 +398,7 @@ public:
   }
 
   ::journal::ReplayHandler *m_replay_handler = nullptr;
+  ::journal::JournalMetadataListener *m_listener = nullptr;
 };
 
 TEST_F(TestMockJournal, StateTransitions) {
@@ -1078,15 +1084,12 @@ TEST_F(TestMockJournal, ExternalReplay) {
   expect_shut_down_journaler(mock_journaler);
 
   C_SaferCond start_ctx;
-  C_SaferCond close_request_ctx;
 
   journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
-  mock_journal.start_external_replay(&journal_replay, &start_ctx,
-                                     &close_request_ctx);
+  mock_journal.start_external_replay(&journal_replay, &start_ctx);
   ASSERT_EQ(0, start_ctx.wait());
 
   mock_journal.stop_external_replay();
-  ASSERT_EQ(-ECANCELED, close_request_ctx.wait());
 }
 
 TEST_F(TestMockJournal, ExternalReplayFailure) {
@@ -1109,16 +1112,13 @@ TEST_F(TestMockJournal, ExternalReplayFailure) {
   expect_shut_down_journaler(mock_journaler);
 
   C_SaferCond start_ctx;
-  C_SaferCond close_request_ctx;
 
   journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
-  mock_journal.start_external_replay(&journal_replay, &start_ctx,
-                                     &close_request_ctx);
+  mock_journal.start_external_replay(&journal_replay, &start_ctx);
   ASSERT_EQ(-EINVAL, start_ctx.wait());
-  ASSERT_EQ(-EINVAL, close_request_ctx.wait());
 }
 
-TEST_F(TestMockJournal, ExternalReplayCloseRequest) {
+TEST_F(TestMockJournal, AppendDisabled) {
   REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
 
   librbd::ImageCtx *ictx;
@@ -1126,31 +1126,61 @@ TEST_F(TestMockJournal, ExternalReplayCloseRequest) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
+  MockJournalPolicy mock_journal_policy;
+
   ::journal::MockJournaler mock_journaler;
   open_journal(mock_image_ctx, mock_journal, mock_journaler);
+  BOOST_SCOPE_EXIT_ALL(&) {
+    close_journal(mock_journal, mock_journaler);
+  };
 
   InSequence seq;
-  expect_stop_append(mock_journaler, 0);
+  RWLock::RLocker snap_locker(mock_image_ctx.snap_lock);
+  EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
+    Return(ictx->get_journal_policy()));
+  ASSERT_TRUE(mock_journal.is_journal_appending());
+
+  EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
+    Return(&mock_journal_policy));
+  EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true));
+  ASSERT_FALSE(mock_journal.is_journal_appending());
+
   expect_shut_down_journaler(mock_journaler);
+}
 
-  C_SaferCond start_ctx;
-  C_SaferCond close_request_ctx;
+TEST_F(TestMockJournal, CloseListenerEvent) {
+  REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
 
-  journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
-  mock_journal.start_external_replay(&journal_replay, &start_ctx,
-                                     &close_request_ctx);
-  ASSERT_EQ(0, start_ctx.wait());
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
 
-  C_SaferCond close_ctx;
-  mock_journal.close(&close_ctx);
+  MockJournalImageCtx mock_image_ctx(*ictx);
+  MockJournal mock_journal(mock_image_ctx);
+  ::journal::MockJournaler mock_journaler;
+  open_journal(mock_image_ctx, mock_journal, mock_journaler);
 
-  ASSERT_EQ(0, close_request_ctx.wait());
-  mock_journal.stop_external_replay();
+  struct Listener : public journal::Listener {
+    C_SaferCond ctx;
+    virtual void handle_close() {
+      ctx.complete(0);
+    }
+    virtual void handle_resync() {
+      ADD_FAILURE() << "unexpected resync request";
+    }
+    virtual void handle_promoted() {
+      ADD_FAILURE() << "unexpected promotion event";
+    }
+  } listener;
+  mock_journal.add_listener(&listener);
+
+  expect_shut_down_journaler(mock_journaler);
+  close_journal(mock_journal, mock_journaler);
 
-  ASSERT_EQ(0, close_ctx.wait());
+  ASSERT_EQ(0, listener.ctx.wait());
+  mock_journal.remove_listener(&listener);
 }
 
-TEST_F(TestMockJournal, AppendDisabled) {
+TEST_F(TestMockJournal, ResyncRequested) {
   REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
 
   librbd::ImageCtx *ictx;
@@ -1158,26 +1188,37 @@ TEST_F(TestMockJournal, AppendDisabled) {
 
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
-  MockJournalPolicy mock_journal_policy;
-
   ::journal::MockJournaler mock_journaler;
   open_journal(mock_image_ctx, mock_journal, mock_journaler);
+
+  struct Listener : public journal::Listener {
+    C_SaferCond ctx;
+    virtual void handle_close() {
+      ADD_FAILURE() << "unexpected close action";
+    }
+    virtual void handle_resync() {
+      ctx.complete(0);
+    }
+    virtual void handle_promoted() {
+      ADD_FAILURE() << "unexpected promotion event";
+    }
+  } listener;
+  mock_journal.add_listener(&listener);
+
   BOOST_SCOPE_EXIT_ALL(&) {
+    mock_journal.remove_listener(&listener);
     close_journal(mock_journal, mock_journaler);
   };
 
   InSequence seq;
-  RWLock::RLocker snap_locker(mock_image_ctx.snap_lock);
-  EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
-    Return(ictx->get_journal_policy()));
-  ASSERT_TRUE(mock_journal.is_journal_appending());
-
-  EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
-    Return(&mock_journal_policy));
-  EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true));
-  ASSERT_FALSE(mock_journal.is_journal_appending());
-
+  journal::ImageClientMeta image_client_meta;
+  image_client_meta.tag_class = 0;
+  image_client_meta.resync_requested = true;
+  expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0);
   expect_shut_down_journaler(mock_journaler);
+
+  m_listener->handle_update(nullptr);
+  ASSERT_EQ(0, listener.ctx.wait());
 }
 
 } // namespace librbd
index 256216336c52b7d6eb4753f1f43d4e4c22f26762..2e7b6b9f0a2d23f3ec4e79e30f051e7370ee884b 100644 (file)
@@ -224,19 +224,6 @@ private:
   Commands commands;
 };
 
-template <typename I>
-struct ResyncListener : public librbd::journal::ResyncListener {
-  ImageReplayer<I> *img_replayer;
-
-  ResyncListener(ImageReplayer<I> *img_replayer)
-    : img_replayer(img_replayer) {
-  }
-
-  virtual void handle_resync() {
-    img_replayer->resync_image();
-  }
-};
-
 } // anonymous namespace
 
 template <typename I>
@@ -285,7 +272,7 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
   m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
         remote_image_id),
   m_progress_cxt(this),
-  m_resync_listener(new ResyncListener<I>(this)),
+  m_journal_listener(new JournalListener(this)),
   m_remote_listener(this)
 {
   // Register asok commands using a temporary "remote_pool_name/global_image_id"
@@ -319,7 +306,7 @@ ImageReplayer<I>::~ImageReplayer()
   assert(m_bootstrap_request == nullptr);
   assert(m_in_flight_status_updates == 0);
 
-  delete m_resync_listener;
+  delete m_journal_listener;
   delete m_asok_hook;
 }
 
@@ -427,7 +414,6 @@ void ImageReplayer<I>::bootstrap() {
 template <typename I>
 void ImageReplayer<I>::handle_bootstrap(int r) {
   dout(20) << "r=" << r << dendl;
-
   {
     Mutex::Locker locker(m_lock);
     m_bootstrap_request->put();
@@ -449,15 +435,25 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
     return;
   }
 
+
+  assert(m_local_journal == nullptr);
   {
-    Mutex::Locker locker(m_lock);
+    RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
+    if (m_local_image_ctx->journal != nullptr) {
+      m_local_journal = m_local_image_ctx->journal;
+      m_local_journal->add_listener(m_journal_listener);
+    }
+  }
 
-    m_local_image_ctx->journal->add_listener(
-                                    librbd::journal::ListenerType::RESYNC,
-                                    m_resync_listener);
+  if (m_local_journal == nullptr) {
+    on_start_fail(-EINVAL, "error accessing local journal");
+    return;
+  }
 
+  {
+    Mutex::Locker locker(m_lock);
     bool do_resync = false;
-    r = m_local_image_ctx->journal->check_resync_requested(&do_resync);
+    r = m_local_image_ctx->journal->is_resync_requested(&do_resync);
     if (r < 0) {
       derr << "failed to check if a resync was requested" << dendl;
     }
@@ -478,7 +474,7 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
     }
 
     std::string name = m_local_ioctx.get_pool_name() + "/" +
-      m_local_image_ctx->name;
+                       m_local_image_ctx->name;
     if (m_name != name) {
       m_name = name;
       if (m_asok_hook) {
@@ -546,23 +542,9 @@ template <typename I>
 void ImageReplayer<I>::start_replay() {
   dout(20) << dendl;
 
-  assert(m_local_journal == nullptr);
-  {
-    RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
-    if (m_local_image_ctx->journal != nullptr) {
-      m_local_journal = m_local_image_ctx->journal;
-
-      Context *start_ctx = create_context_callback<
-        ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
-      Context *stop_ctx = create_context_callback<
-        ImageReplayer, &ImageReplayer<I>::handle_stop_replay_request>(this);
-      m_local_journal->start_external_replay(&m_local_replay, start_ctx,
-                                             stop_ctx);
-      return;
-    }
-  }
-
-  on_start_fail(-EINVAL, "error starting journal replay");
+  Context *start_ctx = create_context_callback<
+    ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
+  m_local_journal->start_external_replay(&m_local_replay, start_ctx);
 }
 
 template <typename I>
@@ -570,7 +552,7 @@ void ImageReplayer<I>::handle_start_replay(int r) {
   dout(20) << "r=" << r << dendl;
 
   if (r < 0) {
-    m_local_journal = nullptr;
+    assert(m_local_replay == nullptr);
     derr << "error starting external replay on local image "
         <<  m_local_image_id << ": " << cpp_strerror(r) << dendl;
     on_start_fail(r, "error starting replay on local image");
@@ -618,19 +600,6 @@ void ImageReplayer<I>::handle_start_replay(int r) {
 
 }
 
-template <typename I>
-void ImageReplayer<I>::handle_stop_replay_request(int r) {
-  if (r < 0) {
-    // error starting or we requested the stop -- ignore
-    return;
-  }
-
-  // journal close has been requested, stop replay so the journal
-  // can be closed (since it will wait on replay to finish)
-  dout(20) << dendl;
-  on_stop_journal_replay();
-}
-
 template <typename I>
 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
 {
@@ -931,9 +900,7 @@ void ImageReplayer<I>::replay_flush() {
         return;
       }
 
-      Context *stop_ctx = create_context_callback<
-        ImageReplayer, &ImageReplayer<I>::handle_stop_replay_request>(this);
-      m_local_journal->start_external_replay(&m_local_replay, ctx, stop_ctx);
+      m_local_journal->start_external_replay(&m_local_replay, ctx);
     });
   m_local_replay->shut_down(false, ctx);
 }
@@ -1386,25 +1353,35 @@ void ImageReplayer<I>::shut_down(int r) {
         });
     }
   }
-  if (m_local_replay != nullptr) {
+  if (m_local_journal != nullptr) {
+    ctx = new FunctionContext([this, ctx](int r) {
+        m_local_journal = nullptr;
+        ctx->complete(0);
+      });
+    if (m_local_replay != nullptr) {
+      ctx = new FunctionContext([this, ctx](int r) {
+          m_local_journal->stop_external_replay();
+          m_local_replay = nullptr;
+
+          delete m_event_preprocessor;
+          m_event_preprocessor = nullptr;
+          ctx->complete(0);
+        });
+    }
     ctx = new FunctionContext([this, ctx](int r) {
         if (r < 0) {
           derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
         }
-        m_local_journal->stop_external_replay();
-        m_local_journal = nullptr;
-        m_local_replay = nullptr;
-
-        delete m_event_preprocessor;
-        m_event_preprocessor = nullptr;
 
+        // blocks if listener notification is in-progress
+        m_local_journal->remove_listener(m_journal_listener);
         ctx->complete(0);
       });
-    ctx = new FunctionContext([this, ctx](int r) {
-        m_local_journal->remove_listener(
-            librbd::journal::ListenerType::RESYNC, m_resync_listener);
-        m_local_replay->shut_down(true, ctx);
-      });
+    if (m_local_replay != nullptr) {
+      ctx = new FunctionContext([this, ctx](int r) {
+          m_local_replay->shut_down(true, ctx);
+        });
+    }
   }
   if (m_replay_handler != nullptr) {
     ctx = new FunctionContext([this, ctx](int r) {
index ba81deaa1ee5893f96a30cfca6ff6730f4044f9a..4baef7f315fd2e3915ec5bff12a33e7120abda7f 100644 (file)
@@ -205,6 +205,26 @@ private:
   typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
   typedef boost::optional<State> OptionalState;
 
+  struct JournalListener : public librbd::journal::Listener {
+    ImageReplayer *img_replayer;
+
+    JournalListener(ImageReplayer *img_replayer)
+      : img_replayer(img_replayer) {
+    }
+
+    virtual void handle_close() {
+      img_replayer->on_stop_journal_replay();
+    }
+
+    virtual void handle_promoted() {
+      // TODO
+    }
+
+    virtual void handle_resync() {
+      img_replayer->resync_image();
+    }
+  };
+
   class BootstrapProgressContext : public ProgressContext {
   public:
     BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
@@ -242,7 +262,7 @@ private:
   librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
   Journaler* m_remote_journaler = nullptr;
   ::journal::ReplayHandler *m_replay_handler = nullptr;
-  librbd::journal::ResyncListener *m_resync_listener;
+  librbd::journal::Listener *m_journal_listener;
   bool m_stopping_for_resync = false;
 
   Context *m_on_start_finish = nullptr;
@@ -327,7 +347,6 @@ private:
 
   void start_replay();
   void handle_start_replay(int r);
-  void handle_stop_replay_request(int r);
 
   void replay_flush();
   void handle_replay_flush(int r);