From f9aae06152e281c271f50201a8dd1852a132447f Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 13 Sep 2016 21:54:46 -0400 Subject: [PATCH] librbd: new journal listener event for force promotion Fixes: http://tracker.ceph.com/issues/16974 Signed-off-by: Jason Dillaman (cherry picked from commit fd005490e95d7fca85be4cad34344a58986f64d6) --- src/librbd/Journal.cc | 112 ++++++++++++++++++++++++--- src/librbd/Journal.h | 9 +++ src/test/librbd/test_mock_Journal.cc | 105 +++++++++++++++++++++---- src/tools/rbd_mirror/ImageReplayer.h | 2 +- 4 files changed, 201 insertions(+), 27 deletions(-) diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 745f2f6a991a3..b5c13a01cf3cc 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -7,7 +7,6 @@ #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Replay.h" -#include "librbd/Utils.h" #include "cls/journal/cls_journal_types.h" #include "journal/Journaler.h" #include "journal/Policy.h" @@ -715,16 +714,25 @@ void Journal::close(Context *on_finish) { template bool Journal::is_tag_owner() const { + Mutex::Locker locker(m_lock); + return is_tag_owner(m_lock); +} + +template +bool Journal::is_tag_owner(const Mutex &) const { + assert(m_lock.is_locked()); return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID); } template uint64_t Journal::get_tag_tid() const { + Mutex::Locker locker(m_lock); return m_tag_tid; } template journal::TagData Journal::get_tag_data() const { + Mutex::Locker locker(m_lock); return m_tag_data; } @@ -734,7 +742,7 @@ int Journal::demote() { ldout(cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); - assert(m_journaler != nullptr && is_tag_owner()); + assert(m_journaler != nullptr && is_tag_owner(m_lock)); cls::journal::Client client; int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); @@ -810,7 +818,7 @@ void Journal::allocate_local_tag(Context *on_finish) { predecessor.mirror_uuid = LOCAL_MIRROR_UUID; { Mutex::Locker locker(m_lock); - assert(m_journaler != nullptr && is_tag_owner()); + assert(m_journaler != nullptr && is_tag_owner(m_lock)); cls::journal::Client client; int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); @@ -1248,9 +1256,16 @@ void Journal::destroy_journaler(int r) { m_journaler->remove_listener(&m_metadata_listener); transition_state(STATE_CLOSING, r); - m_journaler->shut_down(create_async_context_callback( + + Context *ctx = create_async_context_callback( m_image_ctx, create_context_callback< - Journal, &Journal::handle_journal_destroyed>(this))); + Journal, &Journal::handle_journal_destroyed>(this)); + ctx = new FunctionContext( + [this, ctx](int r) { + Mutex::Locker locker(m_lock); + m_journaler->shut_down(ctx); + }); + m_async_journal_op_tracker.wait(m_image_ctx, ctx); } template @@ -1810,22 +1825,97 @@ int Journal::check_resync_requested(bool *do_resync) { return 0; } +struct C_RefreshTags : public Context { + util::AsyncOpTracker &async_op_tracker; + Context *on_finish = nullptr; + + Mutex lock; + uint64_t tag_tid; + journal::TagData tag_data; + + C_RefreshTags(util::AsyncOpTracker &async_op_tracker) + : async_op_tracker(async_op_tracker), + lock("librbd::Journal::C_RefreshTags::lock") { + async_op_tracker.start_op(); + } + virtual ~C_RefreshTags() { + async_op_tracker.finish_op(); + } + + virtual void finish(int r) { + on_finish->complete(r); + } +}; + template void Journal::handle_metadata_updated() { CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << dendl; - Mutex::Locker locker(m_lock); + if (m_state != STATE_READY && !is_journal_replaying(m_lock)) { return; + } else if (is_tag_owner(m_lock)) { + ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl; + return; + } else if (m_listeners.empty()) { + ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl; + return; + } + + uint64_t refresh_sequence = ++m_refresh_sequence; + ldout(cct, 20) << this << " " << __func__ << ": " + << "refresh_sequence=" << refresh_sequence << dendl; + + // pull the most recent tags from the journal, decode, and + // update the internal tag state + C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker); + refresh_ctx->on_finish = new FunctionContext( + [this, refresh_sequence, refresh_ctx](int r) { + handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid, + refresh_ctx->tag_data, r); + }); + C_DecodeTags *decode_tags_ctx = new C_DecodeTags( + cct, &refresh_ctx->lock, &refresh_ctx->tag_tid, + &refresh_ctx->tag_data, refresh_ctx); + m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class, + &decode_tags_ctx->tags, decode_tags_ctx); +} + +template +void Journal::handle_refresh_metadata(uint64_t refresh_sequence, + uint64_t tag_tid, + journal::TagData tag_data, int r) { + CephContext *cct = m_image_ctx.cct; + Mutex::Locker locker(m_lock); + + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: " + << cpp_strerror(r) << dendl; + return; + } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) { + return; + } else if (refresh_sequence != m_refresh_sequence) { + // another, more up-to-date refresh is in-flight + return; } + ldout(cct, 20) << this << " " << __func__ << ": " + << "refresh_sequence=" << refresh_sequence << ", " + << "tag_tid=" << tag_tid << ", " + << "tag_data=" << tag_data << dendl; while (m_listener_notify) { m_listener_cond.Wait(m_lock); } + bool was_tag_owner = is_tag_owner(m_lock); + if (m_tag_tid < tag_tid) { + m_tag_tid = tag_tid; + m_tag_data = tag_data; + } + bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock)); + bool resync_requested = false; - int r = check_resync_requested(&resync_requested); + r = check_resync_requested(&resync_requested); if (r < 0) { lderr(cct) << this << " " << __func__ << ": " << "failed to check if a resync was requested" << dendl; @@ -1836,7 +1926,11 @@ void Journal::handle_metadata_updated() { m_listener_notify = true; m_lock.Unlock(); - if (resync_requested) { + if (promoted_to_primary) { + for (auto listener : listeners) { + listener->handle_promoted(); + } + } else if (resync_requested) { for (auto listener : listeners) { listener->handle_resync(); } diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index a7a51fbd46d0b..ab3079ac2cdf0 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -14,6 +14,7 @@ #include "journal/JournalMetadataListener.h" #include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" +#include "librbd/Utils.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" #include @@ -297,6 +298,8 @@ private: journal::Replay *m_journal_replay; + util::AsyncOpTracker m_async_journal_op_tracker; + struct MetadataListener : public ::journal::JournalMetadataListener { Journal *journal; @@ -315,7 +318,10 @@ private: Cond m_listener_cond; bool m_listener_notify = false; + uint64_t m_refresh_sequence = 0; + bool is_journal_replaying(const Mutex &) const; + bool is_tag_owner(const Mutex &) const; uint64_t append_io_events(journal::EventType event_type, const Bufferlists &bufferlists, @@ -364,6 +370,9 @@ private: int check_resync_requested(bool *do_resync); void handle_metadata_updated(); + void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid, + journal::TagData tag_data, int r); + }; } // namespace librbd diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 70a025a8e204c..b05f2b8706d4f 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -191,8 +191,11 @@ public: void expect_get_journaler_tags(MockImageCtx &mock_image_ctx, ::journal::MockJournaler &mock_journaler, - int r) { + bool primary, int r) { journal::TagData tag_data; + if (!primary) { + tag_data.mirror_uuid = "remote mirror uuid"; + } bufferlist tag_data_bl; ::encode(tag_data, tag_data_bl); @@ -205,6 +208,15 @@ public: .WillOnce(SaveArg<0>(&m_listener)); } + void expect_get_journaler_tags(MockImageCtx &mock_image_ctx, + ::journal::MockJournaler &mock_journaler, + uint64_t start_after_tag_tid, + ::journal::Journaler::Tags &&tags, int r) { + EXPECT_CALL(mock_journaler, get_tags(start_after_tag_tid, 0, _, _)) + .WillOnce(DoAll(SetArgPointee<2>(tags), + WithArg<3>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)))); + } + void expect_start_replay(MockJournalImageCtx &mock_image_ctx, ::journal::MockJournaler &mock_journaler, const ReplayAction &action) { @@ -362,7 +374,8 @@ public: void open_journal(MockJournalImageCtx &mock_image_ctx, MockJournal &mock_journal, - ::journal::MockJournaler &mock_journaler) { + ::journal::MockJournaler &mock_journaler, + bool primary = true) { expect_op_work_queue(mock_image_ctx); InSequence seq; @@ -370,7 +383,7 @@ public: expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, primary, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_complete, _1, 0)); @@ -418,7 +431,7 @@ TEST_F(TestMockJournal, StateTransitions) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_ready, _1)); @@ -506,7 +519,7 @@ TEST_F(TestMockJournal, GetTagsError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, -EBADMSG); expect_shut_down_journaler(mock_journaler); ASSERT_EQ(-EBADMSG, when_open(mock_journal)); } @@ -528,7 +541,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_complete, _1, -EINVAL)); @@ -543,7 +556,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_complete, _1, 0)); @@ -575,7 +588,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_ready, _1)); @@ -595,7 +608,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_complete, _1, 0)); @@ -627,7 +640,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_ready, _1)); @@ -645,7 +658,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_complete, _1, 0)); @@ -676,7 +689,7 @@ TEST_F(TestMockJournal, StopError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_complete, _1, 0)); @@ -708,7 +721,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, @@ -736,7 +749,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -790,7 +803,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_ready, _1)); @@ -814,7 +827,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_init_journaler(mock_journaler, 0); expect_get_max_append_size(mock_journaler, 1 << 16); expect_get_journaler_cached_client(mock_journaler, 0); - expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0); expect_start_replay( mock_image_ctx, mock_journaler, std::bind(&invoke_replay_complete, _1, 0)); @@ -1189,7 +1202,7 @@ TEST_F(TestMockJournal, ResyncRequested) { MockJournalImageCtx mock_image_ctx(*ictx); MockJournal mock_journal(mock_image_ctx); ::journal::MockJournaler mock_journaler; - open_journal(mock_image_ctx, mock_journal, mock_journaler); + open_journal(mock_image_ctx, mock_journal, mock_journaler, false); struct Listener : public journal::Listener { C_SaferCond ctx; @@ -1211,6 +1224,15 @@ TEST_F(TestMockJournal, ResyncRequested) { }; InSequence seq; + + journal::TagData tag_data; + tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID; + + bufferlist tag_data_bl; + ::encode(tag_data, tag_data_bl); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0, + {{0, 0, tag_data_bl}}, 0); + journal::ImageClientMeta image_client_meta; image_client_meta.tag_class = 0; image_client_meta.resync_requested = true; @@ -1221,4 +1243,53 @@ TEST_F(TestMockJournal, ResyncRequested) { ASSERT_EQ(0, listener.ctx.wait()); } +TEST_F(TestMockJournal, ForcePromoted) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); + ::journal::MockJournaler mock_journaler; + open_journal(mock_image_ctx, mock_journal, mock_journaler, false); + + struct Listener : public journal::Listener { + C_SaferCond ctx; + virtual void handle_close() { + ADD_FAILURE() << "unexpected close action"; + } + virtual void handle_resync() { + ADD_FAILURE() << "unexpected resync event"; + } + virtual void handle_promoted() { + ctx.complete(0); + } + } listener; + mock_journal.add_listener(&listener); + + BOOST_SCOPE_EXIT_ALL(&) { + mock_journal.remove_listener(&listener); + close_journal(mock_journal, mock_journaler); + }; + + InSequence seq; + + journal::TagData tag_data; + tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID; + + bufferlist tag_data_bl; + ::encode(tag_data, tag_data_bl); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0, + {{100, 0, tag_data_bl}}, 0); + + journal::ImageClientMeta image_client_meta; + image_client_meta.tag_class = 0; + expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0); + expect_shut_down_journaler(mock_journaler); + + m_listener->handle_update(nullptr); + ASSERT_EQ(0, listener.ctx.wait()); +} + } // namespace librbd diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 4baef7f315fd2..3765dc0867064 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -217,7 +217,7 @@ private: } virtual void handle_promoted() { - // TODO + img_replayer->on_stop_journal_replay(0, "force promoted"); } virtual void handle_resync() { -- 2.39.5