From: Jason Dillaman Date: Tue, 13 Sep 2016 16:37:53 +0000 (-0400) Subject: librbd: unify journal event callbacks into single interface X-Git-Tag: v10.2.4~61^2~22 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=829ff8c5fa9d9470f1e5370cf601509809b39674;p=ceph.git librbd: unify journal event callbacks into single interface Signed-off-by: Jason Dillaman (cherry picked from commit dbbcecf4a289ca36b734b7bda9530cc0a59f84ac) --- diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index a0f7285c2aff0..745f2f6a991a3 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -335,7 +335,6 @@ Journal::~Journal() { assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED); assert(m_journaler == NULL); assert(m_journal_replay == NULL); - assert(m_on_replay_close_request == nullptr); assert(m_wait_for_state_contexts.empty()); } @@ -632,6 +631,12 @@ bool Journal::is_journal_ready() const { template bool Journal::is_journal_replaying() const { Mutex::Locker locker(m_lock); + return is_journal_replaying(m_lock); +} + +template +bool Journal::is_journal_replaying(const Mutex &) const { + assert(m_lock.is_locked()); return (m_state == STATE_REPLAYING || m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART || @@ -679,6 +684,21 @@ void Journal::close(Context *on_finish) { on_finish = create_async_context_callback(m_image_ctx, on_finish); Mutex::Locker locker(m_lock); + while (m_listener_notify) { + m_listener_cond.Wait(m_lock); + } + + Listeners listeners(m_listeners); + m_listener_notify = true; + m_lock.Unlock(); + for (auto listener : listeners) { + listener->handle_close(); + } + + m_lock.Lock(); + m_listener_notify = false; + m_listener_cond.Signal(); + assert(m_state != STATE_UNINITIALIZED); if (m_state == STATE_CLOSED) { on_finish->complete(m_error_result); @@ -689,12 +709,6 @@ void Journal::close(Context *on_finish) { stop_recording(); } - // interrupt external replay if active - if (m_on_replay_close_request != nullptr) { - m_on_replay_close_request->complete(0); - m_on_replay_close_request = nullptr; - } - m_close_pending = true; wait_for_steady_state(on_finish); } @@ -1127,16 +1141,13 @@ typename Journal::Future Journal::wait_event(Mutex &lock, uint64_t tid, template void Journal::start_external_replay(journal::Replay **journal_replay, - Context *on_start, - Context *on_close_request) { + Context *on_start) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; Mutex::Locker locker(m_lock); assert(m_state == STATE_READY); assert(m_journal_replay == nullptr); - assert(m_on_replay_close_request == nullptr); - m_on_replay_close_request = on_close_request; on_start = util::create_async_context_callback(m_image_ctx, on_start); on_start = new FunctionContext( @@ -1165,11 +1176,6 @@ void Journal::handle_start_external_replay(int r, << "failed to stop recording: " << cpp_strerror(r) << dendl; *journal_replay = nullptr; - if (m_on_replay_close_request != nullptr) { - m_on_replay_close_request->complete(r); - m_on_replay_close_request = nullptr; - } - // get back to a sane-state start_append(); on_finish->complete(r); @@ -1184,15 +1190,13 @@ void Journal::handle_start_external_replay(int r, template void Journal::stop_external_replay() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + Mutex::Locker locker(m_lock); assert(m_journal_replay != nullptr); assert(m_state == STATE_REPLAYING); - if (m_on_replay_close_request != nullptr) { - m_on_replay_close_request->complete(-ECANCELED); - m_on_replay_close_request = nullptr; - } - delete m_journal_replay; m_journal_replay = nullptr; @@ -1762,13 +1766,13 @@ void Journal::wait_for_steady_state(Context *on_state) { } template -int Journal::check_resync_requested(bool *do_resync) { +int Journal::is_resync_requested(bool *do_resync) { Mutex::Locker l(m_lock); - return check_resync_requested_internal(do_resync); + return check_resync_requested(do_resync); } template -int Journal::check_resync_requested_internal(bool *do_resync) { +int Journal::check_resync_requested(bool *do_resync) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; @@ -1811,51 +1815,51 @@ void Journal::handle_metadata_updated() { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; - std::list resync_private_list; + Mutex::Locker locker(m_lock); + if (m_state != STATE_READY && !is_journal_replaying(m_lock)) { + return; + } - { - Mutex::Locker l(m_lock); + while (m_listener_notify) { + m_listener_cond.Wait(m_lock); + } - if (m_state == STATE_CLOSING || m_state == STATE_CLOSED || - m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPING) { - return; - } + bool resync_requested = false; + int r = check_resync_requested(&resync_requested); + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": " + << "failed to check if a resync was requested" << dendl; + return; + } - bool do_resync = false; - int r = check_resync_requested_internal(&do_resync); - if (r < 0) { - lderr(cct) << this << " " << __func__ << ": " - << "failed to check if a resync was requested" << dendl; - return; - } + Listeners listeners(m_listeners); + m_listener_notify = true; + m_lock.Unlock(); - if (do_resync) { - for (const auto& listener : - m_listener_map[journal::ListenerType::RESYNC]) { - journal::ResyncListener *rsync_listener = - boost::get(listener); - resync_private_list.push_back(rsync_listener); - } + if (resync_requested) { + for (auto listener : listeners) { + listener->handle_resync(); } } - for (const auto& listener : resync_private_list) { - listener->handle_resync(); - } + m_lock.Lock(); + m_listener_notify = false; + m_listener_cond.Signal(); } template -void Journal::add_listener(journal::ListenerType type, - journal::JournalListenerPtr listener) { - Mutex::Locker l(m_lock); - m_listener_map[type].push_back(listener); +void Journal::add_listener(journal::Listener *listener) { + Mutex::Locker locker(m_lock); + m_listeners.insert(listener); } template -void Journal::remove_listener(journal::ListenerType type, - journal::JournalListenerPtr listener) { - Mutex::Locker l(m_lock); - m_listener_map[type].remove(listener); +void Journal::remove_listener(journal::Listener *listener) { + Mutex::Locker locker(m_lock); + while (m_listener_notify) { + m_listener_cond.Wait(m_lock); + } + m_listeners.erase(listener); } } // namespace librbd diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 1680587224d38..a7a51fbd46d0b 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -8,6 +8,7 @@ #include "include/atomic.h" #include "include/Context.h" #include "include/interval_set.h" +#include "common/Cond.h" #include "common/Mutex.h" #include "journal/Future.h" #include "journal/JournalMetadataListener.h" @@ -157,15 +158,13 @@ public: } void start_external_replay(journal::Replay **journal_replay, - Context *on_start, Context *on_close_request); + Context *on_start); void stop_external_replay(); - void add_listener(journal::ListenerType type, - journal::JournalListenerPtr listener); - void remove_listener(journal::ListenerType type, - journal::JournalListenerPtr listener); + void add_listener(journal::Listener *listener); + void remove_listener(journal::Listener *listener); - int check_resync_requested(bool *do_resync); + int is_resync_requested(bool *do_resync); private: ImageCtxT &m_image_ctx; @@ -297,7 +296,6 @@ private: bool m_blocking_writes; journal::Replay *m_journal_replay; - Context *m_on_replay_close_request = nullptr; struct MetadataListener : public ::journal::JournalMetadataListener { Journal *journal; @@ -312,9 +310,12 @@ private: } } m_metadata_listener; - typedef std::map > ListenerMap; - ListenerMap m_listener_map; + typedef std::set Listeners; + Listeners m_listeners; + Cond m_listener_cond; + bool m_listener_notify = false; + + bool is_journal_replaying(const Mutex &) const; uint64_t append_io_events(journal::EventType event_type, const Bufferlists &bufferlists, @@ -360,7 +361,7 @@ private: bool is_steady_state() const; void wait_for_steady_state(Context *on_state); - int check_resync_requested_internal(bool *do_resync); + int check_resync_requested(bool *do_resync); void handle_metadata_updated(); }; diff --git a/src/librbd/journal/Types.h b/src/librbd/journal/Types.h index 7368b8b33f204..c8c13a9beeaed 100644 --- a/src/librbd/journal/Types.h +++ b/src/librbd/journal/Types.h @@ -508,17 +508,19 @@ std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta); std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor); std::ostream &operator<<(std::ostream &out, const TagData &tag_data); -enum class ListenerType : int8_t { - RESYNC -}; +struct Listener { + virtual ~Listener() { + } -struct ResyncListener { - virtual ~ResyncListener() {} - virtual void handle_resync() = 0; -}; + /// invoked when journal close is requested + virtual void handle_close() = 0; -typedef boost::variant JournalListenerPtr; + /// invoked when journal is promoted to primary + virtual void handle_promoted() = 0; + /// invoked when journal resync is requested + virtual void handle_resync() = 0; +}; } // namespace journal } // namespace librbd diff --git a/src/test/librbd/mock/MockJournal.h b/src/test/librbd/mock/MockJournal.h index 0cd86393c818d..56b32804c41b4 100644 --- a/src/test/librbd/mock/MockJournal.h +++ b/src/test/librbd/mock/MockJournal.h @@ -81,12 +81,10 @@ struct MockJournal { MOCK_METHOD3(commit_op_event, void(uint64_t, int, Context *)); MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *)); - MOCK_METHOD2(add_listener, void(journal::ListenerType, - journal::JournalListenerPtr)); - MOCK_METHOD2(remove_listener, void(journal::ListenerType, - journal::JournalListenerPtr)); + MOCK_METHOD1(add_listener, void(journal::Listener *)); + MOCK_METHOD1(remove_listener, void(journal::Listener *)); - MOCK_METHOD1(check_resync_requested, int(bool *)); + MOCK_METHOD1(is_resync_requested, int(bool *)); }; } // namespace librbd diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 3864cdbfec211..70a025a8e204c 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -170,12 +170,16 @@ public: } void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) { - journal::ImageClientMeta image_client_meta; image_client_meta.tag_class = 0; + expect_get_journaler_cached_client(mock_journaler, image_client_meta, r); + } + void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, + const journal::ImageClientMeta &client_meta, + int r) { journal::ClientData client_data; - client_data.client_meta = image_client_meta; + client_data.client_meta = client_meta; cls::journal::Client client; ::encode(client_data, client.data); @@ -197,7 +201,8 @@ public: EXPECT_CALL(mock_journaler, get_tags(0, _, _)) .WillOnce(DoAll(SetArgPointee<1>(tags), WithArg<2>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)))); - EXPECT_CALL(mock_journaler, add_listener(_)); + EXPECT_CALL(mock_journaler, add_listener(_)) + .WillOnce(SaveArg<0>(&m_listener)); } void expect_start_replay(MockJournalImageCtx &mock_image_ctx, @@ -393,6 +398,7 @@ public: } ::journal::ReplayHandler *m_replay_handler = nullptr; + ::journal::JournalMetadataListener *m_listener = nullptr; }; TEST_F(TestMockJournal, StateTransitions) { @@ -1078,15 +1084,12 @@ TEST_F(TestMockJournal, ExternalReplay) { expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; - C_SaferCond close_request_ctx; journal::Replay *journal_replay = nullptr; - mock_journal.start_external_replay(&journal_replay, &start_ctx, - &close_request_ctx); + mock_journal.start_external_replay(&journal_replay, &start_ctx); ASSERT_EQ(0, start_ctx.wait()); mock_journal.stop_external_replay(); - ASSERT_EQ(-ECANCELED, close_request_ctx.wait()); } TEST_F(TestMockJournal, ExternalReplayFailure) { @@ -1109,16 +1112,13 @@ TEST_F(TestMockJournal, ExternalReplayFailure) { expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; - C_SaferCond close_request_ctx; journal::Replay *journal_replay = nullptr; - mock_journal.start_external_replay(&journal_replay, &start_ctx, - &close_request_ctx); + mock_journal.start_external_replay(&journal_replay, &start_ctx); ASSERT_EQ(-EINVAL, start_ctx.wait()); - ASSERT_EQ(-EINVAL, close_request_ctx.wait()); } -TEST_F(TestMockJournal, ExternalReplayCloseRequest) { +TEST_F(TestMockJournal, AppendDisabled) { REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); librbd::ImageCtx *ictx; @@ -1126,31 +1126,61 @@ TEST_F(TestMockJournal, ExternalReplayCloseRequest) { MockJournalImageCtx mock_image_ctx(*ictx); MockJournal mock_journal(mock_image_ctx); + MockJournalPolicy mock_journal_policy; + ::journal::MockJournaler mock_journaler; open_journal(mock_image_ctx, mock_journal, mock_journaler); + BOOST_SCOPE_EXIT_ALL(&) { + close_journal(mock_journal, mock_journaler); + }; InSequence seq; - expect_stop_append(mock_journaler, 0); + RWLock::RLocker snap_locker(mock_image_ctx.snap_lock); + EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( + Return(ictx->get_journal_policy())); + ASSERT_TRUE(mock_journal.is_journal_appending()); + + EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( + Return(&mock_journal_policy)); + EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true)); + ASSERT_FALSE(mock_journal.is_journal_appending()); + expect_shut_down_journaler(mock_journaler); +} - C_SaferCond start_ctx; - C_SaferCond close_request_ctx; +TEST_F(TestMockJournal, CloseListenerEvent) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); - journal::Replay *journal_replay = nullptr; - mock_journal.start_external_replay(&journal_replay, &start_ctx, - &close_request_ctx); - ASSERT_EQ(0, start_ctx.wait()); + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); - C_SaferCond close_ctx; - mock_journal.close(&close_ctx); + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); + ::journal::MockJournaler mock_journaler; + open_journal(mock_image_ctx, mock_journal, mock_journaler); - ASSERT_EQ(0, close_request_ctx.wait()); - mock_journal.stop_external_replay(); + struct Listener : public journal::Listener { + C_SaferCond ctx; + virtual void handle_close() { + ctx.complete(0); + } + virtual void handle_resync() { + ADD_FAILURE() << "unexpected resync request"; + } + virtual void handle_promoted() { + ADD_FAILURE() << "unexpected promotion event"; + } + } listener; + mock_journal.add_listener(&listener); + + expect_shut_down_journaler(mock_journaler); + close_journal(mock_journal, mock_journaler); - ASSERT_EQ(0, close_ctx.wait()); + ASSERT_EQ(0, listener.ctx.wait()); + mock_journal.remove_listener(&listener); } -TEST_F(TestMockJournal, AppendDisabled) { +TEST_F(TestMockJournal, ResyncRequested) { REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); librbd::ImageCtx *ictx; @@ -1158,26 +1188,37 @@ TEST_F(TestMockJournal, AppendDisabled) { MockJournalImageCtx mock_image_ctx(*ictx); MockJournal mock_journal(mock_image_ctx); - MockJournalPolicy mock_journal_policy; - ::journal::MockJournaler mock_journaler; open_journal(mock_image_ctx, mock_journal, mock_journaler); + + struct Listener : public journal::Listener { + C_SaferCond ctx; + virtual void handle_close() { + ADD_FAILURE() << "unexpected close action"; + } + virtual void handle_resync() { + ctx.complete(0); + } + virtual void handle_promoted() { + ADD_FAILURE() << "unexpected promotion event"; + } + } listener; + mock_journal.add_listener(&listener); + BOOST_SCOPE_EXIT_ALL(&) { + mock_journal.remove_listener(&listener); close_journal(mock_journal, mock_journaler); }; InSequence seq; - RWLock::RLocker snap_locker(mock_image_ctx.snap_lock); - EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( - Return(ictx->get_journal_policy())); - ASSERT_TRUE(mock_journal.is_journal_appending()); - - EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce( - Return(&mock_journal_policy)); - EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true)); - ASSERT_FALSE(mock_journal.is_journal_appending()); - + journal::ImageClientMeta image_client_meta; + image_client_meta.tag_class = 0; + image_client_meta.resync_requested = true; + expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0); expect_shut_down_journaler(mock_journaler); + + m_listener->handle_update(nullptr); + ASSERT_EQ(0, listener.ctx.wait()); } } // namespace librbd diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 256216336c52b..2e7b6b9f0a2d2 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -224,19 +224,6 @@ private: Commands commands; }; -template -struct ResyncListener : public librbd::journal::ResyncListener { - ImageReplayer *img_replayer; - - ResyncListener(ImageReplayer *img_replayer) - : img_replayer(img_replayer) { - } - - virtual void handle_resync() { - img_replayer->resync_image(); - } -}; - } // anonymous namespace template @@ -285,7 +272,7 @@ ImageReplayer::ImageReplayer(Threads *threads, m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " + remote_image_id), m_progress_cxt(this), - m_resync_listener(new ResyncListener(this)), + m_journal_listener(new JournalListener(this)), m_remote_listener(this) { // Register asok commands using a temporary "remote_pool_name/global_image_id" @@ -319,7 +306,7 @@ ImageReplayer::~ImageReplayer() assert(m_bootstrap_request == nullptr); assert(m_in_flight_status_updates == 0); - delete m_resync_listener; + delete m_journal_listener; delete m_asok_hook; } @@ -427,7 +414,6 @@ void ImageReplayer::bootstrap() { template void ImageReplayer::handle_bootstrap(int r) { dout(20) << "r=" << r << dendl; - { Mutex::Locker locker(m_lock); m_bootstrap_request->put(); @@ -449,15 +435,25 @@ void ImageReplayer::handle_bootstrap(int r) { return; } + + assert(m_local_journal == nullptr); { - Mutex::Locker locker(m_lock); + RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock); + if (m_local_image_ctx->journal != nullptr) { + m_local_journal = m_local_image_ctx->journal; + m_local_journal->add_listener(m_journal_listener); + } + } - m_local_image_ctx->journal->add_listener( - librbd::journal::ListenerType::RESYNC, - m_resync_listener); + if (m_local_journal == nullptr) { + on_start_fail(-EINVAL, "error accessing local journal"); + return; + } + { + Mutex::Locker locker(m_lock); bool do_resync = false; - r = m_local_image_ctx->journal->check_resync_requested(&do_resync); + r = m_local_image_ctx->journal->is_resync_requested(&do_resync); if (r < 0) { derr << "failed to check if a resync was requested" << dendl; } @@ -478,7 +474,7 @@ void ImageReplayer::handle_bootstrap(int r) { } std::string name = m_local_ioctx.get_pool_name() + "/" + - m_local_image_ctx->name; + m_local_image_ctx->name; if (m_name != name) { m_name = name; if (m_asok_hook) { @@ -546,23 +542,9 @@ template void ImageReplayer::start_replay() { dout(20) << dendl; - assert(m_local_journal == nullptr); - { - RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock); - if (m_local_image_ctx->journal != nullptr) { - m_local_journal = m_local_image_ctx->journal; - - Context *start_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_start_replay>(this); - Context *stop_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_stop_replay_request>(this); - m_local_journal->start_external_replay(&m_local_replay, start_ctx, - stop_ctx); - return; - } - } - - on_start_fail(-EINVAL, "error starting journal replay"); + Context *start_ctx = create_context_callback< + ImageReplayer, &ImageReplayer::handle_start_replay>(this); + m_local_journal->start_external_replay(&m_local_replay, start_ctx); } template @@ -570,7 +552,7 @@ void ImageReplayer::handle_start_replay(int r) { dout(20) << "r=" << r << dendl; if (r < 0) { - m_local_journal = nullptr; + assert(m_local_replay == nullptr); derr << "error starting external replay on local image " << m_local_image_id << ": " << cpp_strerror(r) << dendl; on_start_fail(r, "error starting replay on local image"); @@ -618,19 +600,6 @@ void ImageReplayer::handle_start_replay(int r) { } -template -void ImageReplayer::handle_stop_replay_request(int r) { - if (r < 0) { - // error starting or we requested the stop -- ignore - return; - } - - // journal close has been requested, stop replay so the journal - // can be closed (since it will wait on replay to finish) - dout(20) << dendl; - on_stop_journal_replay(); -} - template void ImageReplayer::on_start_fail(int r, const std::string &desc) { @@ -931,9 +900,7 @@ void ImageReplayer::replay_flush() { return; } - Context *stop_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_stop_replay_request>(this); - m_local_journal->start_external_replay(&m_local_replay, ctx, stop_ctx); + m_local_journal->start_external_replay(&m_local_replay, ctx); }); m_local_replay->shut_down(false, ctx); } @@ -1386,25 +1353,35 @@ void ImageReplayer::shut_down(int r) { }); } } - if (m_local_replay != nullptr) { + if (m_local_journal != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + m_local_journal = nullptr; + ctx->complete(0); + }); + if (m_local_replay != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + m_local_journal->stop_external_replay(); + m_local_replay = nullptr; + + delete m_event_preprocessor; + m_event_preprocessor = nullptr; + ctx->complete(0); + }); + } ctx = new FunctionContext([this, ctx](int r) { if (r < 0) { derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; } - m_local_journal->stop_external_replay(); - m_local_journal = nullptr; - m_local_replay = nullptr; - - delete m_event_preprocessor; - m_event_preprocessor = nullptr; + // blocks if listener notification is in-progress + m_local_journal->remove_listener(m_journal_listener); ctx->complete(0); }); - ctx = new FunctionContext([this, ctx](int r) { - m_local_journal->remove_listener( - librbd::journal::ListenerType::RESYNC, m_resync_listener); - m_local_replay->shut_down(true, ctx); - }); + if (m_local_replay != nullptr) { + ctx = new FunctionContext([this, ctx](int r) { + m_local_replay->shut_down(true, ctx); + }); + } } if (m_replay_handler != nullptr) { ctx = new FunctionContext([this, ctx](int r) { diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index ba81deaa1ee58..4baef7f315fd2 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -205,6 +205,26 @@ private: typedef typename librbd::journal::TypeTraits::Journaler Journaler; typedef boost::optional OptionalState; + struct JournalListener : public librbd::journal::Listener { + ImageReplayer *img_replayer; + + JournalListener(ImageReplayer *img_replayer) + : img_replayer(img_replayer) { + } + + virtual void handle_close() { + img_replayer->on_stop_journal_replay(); + } + + virtual void handle_promoted() { + // TODO + } + + virtual void handle_resync() { + img_replayer->resync_image(); + } + }; + class BootstrapProgressContext : public ProgressContext { public: BootstrapProgressContext(ImageReplayer *replayer) : @@ -242,7 +262,7 @@ private: librbd::journal::Replay *m_local_replay = nullptr; Journaler* m_remote_journaler = nullptr; ::journal::ReplayHandler *m_replay_handler = nullptr; - librbd::journal::ResyncListener *m_resync_listener; + librbd::journal::Listener *m_journal_listener; bool m_stopping_for_resync = false; Context *m_on_start_finish = nullptr; @@ -327,7 +347,6 @@ private: void start_replay(); void handle_start_replay(int r); - void handle_stop_replay_request(int r); void replay_flush(); void handle_replay_flush(int r);