]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: new journal listener event for force promotion
authorJason Dillaman <dillaman@redhat.com>
Wed, 14 Sep 2016 01:54:46 +0000 (21:54 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 11 Oct 2016 16:51:15 +0000 (12:51 -0400)
Fixes: http://tracker.ceph.com/issues/16974
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit fd005490e95d7fca85be4cad34344a58986f64d6)

src/librbd/Journal.cc
src/librbd/Journal.h
src/test/librbd/test_mock_Journal.cc
src/tools/rbd_mirror/ImageReplayer.h

index 745f2f6a991a3fc8cd58840a3cdf5252994791f2..b5c13a01cf3cc66adc72c2fa77110c6eb31349c5 100644 (file)
@@ -7,7 +7,6 @@
 #include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/journal/Replay.h"
-#include "librbd/Utils.h"
 #include "cls/journal/cls_journal_types.h"
 #include "journal/Journaler.h"
 #include "journal/Policy.h"
@@ -715,16 +714,25 @@ void Journal<I>::close(Context *on_finish) {
 
 template <typename I>
 bool Journal<I>::is_tag_owner() const {
+  Mutex::Locker locker(m_lock);
+  return is_tag_owner(m_lock);
+}
+
+template <typename I>
+bool Journal<I>::is_tag_owner(const Mutex &) const {
+  assert(m_lock.is_locked());
   return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
 }
 
 template <typename I>
 uint64_t Journal<I>::get_tag_tid() const {
+  Mutex::Locker locker(m_lock);
   return m_tag_tid;
 }
 
 template <typename I>
 journal::TagData Journal<I>::get_tag_data() const {
+  Mutex::Locker locker(m_lock);
   return m_tag_data;
 }
 
@@ -734,7 +742,7 @@ int Journal<I>::demote() {
   ldout(cct, 20) << __func__ << dendl;
 
   Mutex::Locker locker(m_lock);
-  assert(m_journaler != nullptr && is_tag_owner());
+  assert(m_journaler != nullptr && is_tag_owner(m_lock));
 
   cls::journal::Client client;
   int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
@@ -810,7 +818,7 @@ void Journal<I>::allocate_local_tag(Context *on_finish) {
   predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
   {
     Mutex::Locker locker(m_lock);
-    assert(m_journaler != nullptr && is_tag_owner());
+    assert(m_journaler != nullptr && is_tag_owner(m_lock));
 
     cls::journal::Client client;
     int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
@@ -1248,9 +1256,16 @@ void Journal<I>::destroy_journaler(int r) {
   m_journaler->remove_listener(&m_metadata_listener);
 
   transition_state(STATE_CLOSING, r);
-  m_journaler->shut_down(create_async_context_callback(
+
+  Context *ctx = create_async_context_callback(
     m_image_ctx, create_context_callback<
-      Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
+      Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
+  ctx = new FunctionContext(
+    [this, ctx](int r) {
+      Mutex::Locker locker(m_lock);
+      m_journaler->shut_down(ctx);
+    });
+  m_async_journal_op_tracker.wait(m_image_ctx, ctx);
 }
 
 template <typename I>
@@ -1810,22 +1825,97 @@ int Journal<I>::check_resync_requested(bool *do_resync) {
   return 0;
 }
 
+struct C_RefreshTags : public Context {
+  util::AsyncOpTracker &async_op_tracker;
+  Context *on_finish = nullptr;
+
+  Mutex lock;
+  uint64_t tag_tid;
+  journal::TagData tag_data;
+
+  C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
+    : async_op_tracker(async_op_tracker),
+      lock("librbd::Journal::C_RefreshTags::lock") {
+    async_op_tracker.start_op();
+  }
+  virtual ~C_RefreshTags() {
+     async_op_tracker.finish_op();
+  }
+
+  virtual void finish(int r) {
+    on_finish->complete(r);
+  }
+};
+
 template <typename I>
 void Journal<I>::handle_metadata_updated() {
   CephContext *cct = m_image_ctx.cct;
-  ldout(cct, 20) << this << " " << __func__ << dendl;
-
   Mutex::Locker locker(m_lock);
+
   if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
     return;
+  } else if (is_tag_owner(m_lock)) {
+    ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
+    return;
+  } else if (m_listeners.empty()) {
+    ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
+    return;
+  }
+
+  uint64_t refresh_sequence = ++m_refresh_sequence;
+  ldout(cct, 20) << this << " " << __func__ << ": "
+                 << "refresh_sequence=" << refresh_sequence << dendl;
+
+  // pull the most recent tags from the journal, decode, and
+  // update the internal tag state
+  C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
+  refresh_ctx->on_finish = new FunctionContext(
+    [this, refresh_sequence, refresh_ctx](int r) {
+      handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
+                              refresh_ctx->tag_data, r);
+    });
+  C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
+      cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
+      &refresh_ctx->tag_data, refresh_ctx);
+  m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
+                        &decode_tags_ctx->tags, decode_tags_ctx);
+}
+
+template <typename I>
+void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
+                                         uint64_t tag_tid,
+                                         journal::TagData tag_data, int r) {
+  CephContext *cct = m_image_ctx.cct;
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
+               << cpp_strerror(r) << dendl;
+    return;
+  } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
+    return;
+  } else if (refresh_sequence != m_refresh_sequence) {
+    // another, more up-to-date refresh is in-flight
+    return;
   }
 
+  ldout(cct, 20) << this << " " << __func__ << ": "
+                 << "refresh_sequence=" << refresh_sequence << ", "
+                 << "tag_tid=" << tag_tid << ", "
+                 << "tag_data=" << tag_data << dendl;
   while (m_listener_notify) {
     m_listener_cond.Wait(m_lock);
   }
 
+  bool was_tag_owner = is_tag_owner(m_lock);
+  if (m_tag_tid < tag_tid) {
+    m_tag_tid = tag_tid;
+    m_tag_data = tag_data;
+  }
+  bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
+
   bool resync_requested = false;
-  int r = check_resync_requested(&resync_requested);
+  r = check_resync_requested(&resync_requested);
   if (r < 0) {
     lderr(cct) << this << " " << __func__ << ": "
                << "failed to check if a resync was requested" << dendl;
@@ -1836,7 +1926,11 @@ void Journal<I>::handle_metadata_updated() {
   m_listener_notify = true;
   m_lock.Unlock();
 
-  if (resync_requested) {
+  if (promoted_to_primary) {
+    for (auto listener : listeners) {
+      listener->handle_promoted();
+    }
+  } else if (resync_requested) {
     for (auto listener : listeners) {
       listener->handle_resync();
     }
index a7a51fbd46d0bf41811e2948a0809d04e3792efa..ab3079ac2cdf075512fbc5c0a372d69492946b2c 100644 (file)
@@ -14,6 +14,7 @@
 #include "journal/JournalMetadataListener.h"
 #include "journal/ReplayEntry.h"
 #include "journal/ReplayHandler.h"
+#include "librbd/Utils.h"
 #include "librbd/journal/Types.h"
 #include "librbd/journal/TypeTraits.h"
 #include <algorithm>
@@ -297,6 +298,8 @@ private:
 
   journal::Replay<ImageCtxT> *m_journal_replay;
 
+  util::AsyncOpTracker m_async_journal_op_tracker;
+
   struct MetadataListener : public ::journal::JournalMetadataListener {
     Journal<ImageCtxT> *journal;
 
@@ -315,7 +318,10 @@ private:
   Cond m_listener_cond;
   bool m_listener_notify = false;
 
+  uint64_t m_refresh_sequence = 0;
+
   bool is_journal_replaying(const Mutex &) const;
+  bool is_tag_owner(const Mutex &) const;
 
   uint64_t append_io_events(journal::EventType event_type,
                             const Bufferlists &bufferlists,
@@ -364,6 +370,9 @@ private:
   int check_resync_requested(bool *do_resync);
 
   void handle_metadata_updated();
+  void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid,
+                               journal::TagData tag_data, int r);
+
 };
 
 } // namespace librbd
index 70a025a8e204c18990ae78786255900a92f3f732..b05f2b8706d4fbc8ceed1209e1ee804525e0103d 100644 (file)
@@ -191,8 +191,11 @@ public:
 
   void expect_get_journaler_tags(MockImageCtx &mock_image_ctx,
                                  ::journal::MockJournaler &mock_journaler,
-                                 int r) {
+                                 bool primary, int r) {
     journal::TagData tag_data;
+    if (!primary) {
+      tag_data.mirror_uuid = "remote mirror uuid";
+    }
 
     bufferlist tag_data_bl;
     ::encode(tag_data, tag_data_bl);
@@ -205,6 +208,15 @@ public:
                   .WillOnce(SaveArg<0>(&m_listener));
   }
 
+  void expect_get_journaler_tags(MockImageCtx &mock_image_ctx,
+                                 ::journal::MockJournaler &mock_journaler,
+                                 uint64_t start_after_tag_tid,
+                                 ::journal::Journaler::Tags &&tags, int r) {
+    EXPECT_CALL(mock_journaler, get_tags(start_after_tag_tid, 0, _, _))
+                  .WillOnce(DoAll(SetArgPointee<2>(tags),
+                                  WithArg<3>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue))));
+  }
+
   void expect_start_replay(MockJournalImageCtx &mock_image_ctx,
                            ::journal::MockJournaler &mock_journaler,
                            const ReplayAction &action) {
@@ -362,7 +374,8 @@ public:
 
   void open_journal(MockJournalImageCtx &mock_image_ctx,
                     MockJournal &mock_journal,
-                    ::journal::MockJournaler &mock_journaler) {
+                    ::journal::MockJournaler &mock_journaler,
+                    bool primary = true) {
     expect_op_work_queue(mock_image_ctx);
 
     InSequence seq;
@@ -370,7 +383,7 @@ public:
     expect_init_journaler(mock_journaler, 0);
     expect_get_max_append_size(mock_journaler, 1 << 16);
     expect_get_journaler_cached_client(mock_journaler, 0);
-    expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+    expect_get_journaler_tags(mock_image_ctx, mock_journaler, primary, 0);
     expect_start_replay(
       mock_image_ctx, mock_journaler,
       std::bind(&invoke_replay_complete, _1, 0));
@@ -418,7 +431,7 @@ TEST_F(TestMockJournal, StateTransitions) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_ready, _1));
@@ -506,7 +519,7 @@ TEST_F(TestMockJournal, GetTagsError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, -EBADMSG);
   expect_shut_down_journaler(mock_journaler);
   ASSERT_EQ(-EBADMSG, when_open(mock_journal));
 }
@@ -528,7 +541,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_complete, _1, -EINVAL));
@@ -543,7 +556,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_complete, _1, 0));
@@ -575,7 +588,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_ready, _1));
@@ -595,7 +608,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_complete, _1, 0));
@@ -627,7 +640,7 @@ TEST_F(TestMockJournal, CorruptEntry) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_ready, _1));
@@ -645,7 +658,7 @@ TEST_F(TestMockJournal, CorruptEntry) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_complete, _1, 0));
@@ -676,7 +689,7 @@ TEST_F(TestMockJournal, StopError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_complete, _1, 0));
@@ -708,7 +721,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
 
   expect_start_replay(
     mock_image_ctx, mock_journaler,
@@ -736,7 +749,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler, {
       std::bind(&invoke_replay_complete, _1, 0)
@@ -790,7 +803,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_ready, _1));
@@ -814,7 +827,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
   expect_init_journaler(mock_journaler, 0);
   expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
-  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
   expect_start_replay(
     mock_image_ctx, mock_journaler,
     std::bind(&invoke_replay_complete, _1, 0));
@@ -1189,7 +1202,7 @@ TEST_F(TestMockJournal, ResyncRequested) {
   MockJournalImageCtx mock_image_ctx(*ictx);
   MockJournal mock_journal(mock_image_ctx);
   ::journal::MockJournaler mock_journaler;
-  open_journal(mock_image_ctx, mock_journal, mock_journaler);
+  open_journal(mock_image_ctx, mock_journal, mock_journaler, false);
 
   struct Listener : public journal::Listener {
     C_SaferCond ctx;
@@ -1211,6 +1224,15 @@ TEST_F(TestMockJournal, ResyncRequested) {
   };
 
   InSequence seq;
+
+  journal::TagData tag_data;
+  tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID;
+
+  bufferlist tag_data_bl;
+  ::encode(tag_data, tag_data_bl);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0,
+                            {{0, 0, tag_data_bl}}, 0);
+
   journal::ImageClientMeta image_client_meta;
   image_client_meta.tag_class = 0;
   image_client_meta.resync_requested = true;
@@ -1221,4 +1243,53 @@ TEST_F(TestMockJournal, ResyncRequested) {
   ASSERT_EQ(0, listener.ctx.wait());
 }
 
+TEST_F(TestMockJournal, ForcePromoted) {
+  REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockJournalImageCtx mock_image_ctx(*ictx);
+  MockJournal mock_journal(mock_image_ctx);
+  ::journal::MockJournaler mock_journaler;
+  open_journal(mock_image_ctx, mock_journal, mock_journaler, false);
+
+  struct Listener : public journal::Listener {
+    C_SaferCond ctx;
+    virtual void handle_close() {
+      ADD_FAILURE() << "unexpected close action";
+    }
+    virtual void handle_resync() {
+      ADD_FAILURE() << "unexpected resync event";
+    }
+    virtual void handle_promoted() {
+      ctx.complete(0);
+    }
+  } listener;
+  mock_journal.add_listener(&listener);
+
+  BOOST_SCOPE_EXIT_ALL(&) {
+    mock_journal.remove_listener(&listener);
+    close_journal(mock_journal, mock_journaler);
+  };
+
+  InSequence seq;
+
+  journal::TagData tag_data;
+  tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID;
+
+  bufferlist tag_data_bl;
+  ::encode(tag_data, tag_data_bl);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0,
+                            {{100, 0, tag_data_bl}}, 0);
+
+  journal::ImageClientMeta image_client_meta;
+  image_client_meta.tag_class = 0;
+  expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0);
+  expect_shut_down_journaler(mock_journaler);
+
+  m_listener->handle_update(nullptr);
+  ASSERT_EQ(0, listener.ctx.wait());
+}
+
 } // namespace librbd
index 4baef7f315fd2e3915ec5bff12a33e7120abda7f..3765dc0867064fe517b546b3ae3801bc231827d4 100644 (file)
@@ -217,7 +217,7 @@ private:
     }
 
     virtual void handle_promoted() {
-      // TODO
+      img_replayer->on_stop_journal_replay(0, "force promoted");
     }
 
     virtual void handle_resync() {