From: Ricardo Dias Date: Thu, 12 May 2016 17:10:38 +0000 (+0100) Subject: rbd: journal: Support for listening updates on client metadata X-Git-Tag: v11.0.0~69^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0dd85739e9034912b86250ced2834dd7fb3d92fd;p=ceph.git rbd: journal: Support for listening updates on client metadata Currently we only support listening for image resync requests. Signed-off-by: Ricardo Dias --- diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index b874005843ee..268abb57c2cb 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -300,7 +300,8 @@ Journal::Journal(I &image_ctx) m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED), m_error_result(0), m_replay_handler(this), m_close_pending(false), m_event_lock("Journal::m_event_lock"), m_event_tid(0), - m_blocking_writes(false), m_journal_replay(NULL) { + m_blocking_writes(false), m_journal_replay(NULL), + m_metadata_listener(this) { CephContext *cct = m_image_ctx.cct; ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl; @@ -1157,6 +1158,8 @@ void Journal::destroy_journaler(int r) { delete m_journal_replay; m_journal_replay = NULL; + m_journaler->remove_listener(&m_metadata_listener); + transition_state(STATE_CLOSING, r); m_journaler->shut_down(create_async_context_callback( m_image_ctx, create_context_callback< @@ -1175,6 +1178,8 @@ void Journal::recreate_journaler(int r) { delete m_journal_replay; m_journal_replay = NULL; + m_journaler->remove_listener(&m_metadata_listener); + transition_state(STATE_RESTARTING_REPLAY, r); m_journaler->shut_down(create_async_context_callback( m_image_ctx, create_context_callback< @@ -1276,6 +1281,8 @@ void Journal::handle_initialized(int r) { m_image_ctx, create_context_callback< Journal, &Journal::handle_get_tags>(this))); m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx); + + m_journaler->add_listener(&m_metadata_listener); } template @@ -1652,6 +1659,99 @@ void Journal::wait_for_steady_state(Context *on_state) { m_wait_for_state_contexts.push_back(on_state); } +template +int Journal::check_resync_requested(bool *do_resync) { + Mutex::Locker l(m_lock); + return check_resync_requested_internal(do_resync); +} + +template +int Journal::check_resync_requested_internal(bool *do_resync) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + assert(m_lock.is_locked()); + assert(do_resync != nullptr); + + cls::journal::Client client; + int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); + if (r < 0) { + lderr(cct) << "failed to retrieve client: " << cpp_strerror(r) << dendl; + return r; + } + + librbd::journal::ClientData client_data; + bufferlist::iterator bl_it = client.data.begin(); + try { + ::decode(client_data, bl_it); + } catch (const buffer::error &err) { + lderr(cct) << "failed to decode client data: " << err << dendl; + return -EINVAL; + } + + journal::ImageClientMeta *image_client_meta = + boost::get(&client_data.client_meta); + if (image_client_meta == nullptr) { + lderr(cct) << "failed to access image client meta struct" << dendl; + return -EINVAL; + } + + *do_resync = image_client_meta->resync_requested; + + return 0; +} + +template +void Journal::handle_metadata_updated() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + std::list resync_private_list; + + { + Mutex::Locker l(m_lock); + + if (m_state == STATE_CLOSING || m_state == STATE_CLOSED || + m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPING) { + return; + } + + bool do_resync = false; + int r = check_resync_requested_internal(&do_resync); + if (r < 0) { + lderr(cct) << "failed to check if a resync was requested" << dendl; + return; + } + + if (do_resync) { + for (const auto& listener : + m_listener_map[journal::ListenerType::RESYNC]) { + journal::ResyncListener *rsync_listener = + boost::get(listener); + resync_private_list.push_back(rsync_listener); + } + } + } + + for (const auto& listener : resync_private_list) { + listener->handle_resync(); + } +} + +template +void Journal::add_listener(journal::ListenerType type, + journal::JournalListenerPtr listener) { + Mutex::Locker l(m_lock); + m_listener_map[type].push_back(listener); +} + +template +void Journal::remove_listener(journal::ListenerType type, + journal::JournalListenerPtr listener) { + Mutex::Locker l(m_lock); + m_listener_map[type].remove(listener); +} + } // namespace librbd template class librbd::Journal; diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index d77d50ed2868..c5cee0f3f2cf 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -11,6 +11,7 @@ #include "include/rados/librados.hpp" #include "common/Mutex.h" #include "journal/Future.h" +#include "journal/JournalMetadataListener.h" #include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" #include "librbd/journal/Types.h" @@ -158,6 +159,13 @@ public: Context *on_finish); void stop_external_replay(); + void add_listener(journal::ListenerType type, + journal::JournalListenerPtr listener); + void remove_listener(journal::ListenerType type, + journal::JournalListenerPtr listener); + + int check_resync_requested(bool *do_resync); + private: ImageCtxT &m_image_ctx; @@ -287,6 +295,23 @@ private: journal::Replay *m_journal_replay; + struct MetadataListener : public ::journal::JournalMetadataListener { + Journal *journal; + + MetadataListener(Journal *journal) : journal(journal) { } + + void handle_update(::journal::JournalMetadata *) { + FunctionContext *ctx = new FunctionContext([this](int r) { + journal->handle_metadata_updated(); + }); + journal->m_work_queue->queue(ctx, 0); + } + } m_metadata_listener; + + typedef std::map > ListenerMap; + ListenerMap m_listener_map; + uint64_t append_io_events(journal::EventType event_type, const Bufferlists &bufferlists, const AioObjectRequests &requests, @@ -330,6 +355,10 @@ private: bool is_steady_state() const; void wait_for_steady_state(Context *on_state); + + int check_resync_requested_internal(bool *do_resync); + + void handle_metadata_updated(); }; } // namespace librbd diff --git a/src/librbd/journal/Types.h b/src/librbd/journal/Types.h index 4008a0f15bfa..4748aeb2ebb9 100644 --- a/src/librbd/journal/Types.h +++ b/src/librbd/journal/Types.h @@ -483,6 +483,18 @@ std::ostream &operator<<(std::ostream &out, const MirrorPeerState &meta); std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta); std::ostream &operator<<(std::ostream &out, const TagData &tag_data); +enum class ListenerType : int8_t { + RESYNC +}; + +struct ResyncListener { + virtual ~ResyncListener() {} + virtual void handle_resync() = 0; +}; + +typedef boost::variant JournalListenerPtr; + + } // namespace journal } // namespace librbd diff --git a/src/test/librbd/mock/MockJournal.h b/src/test/librbd/mock/MockJournal.h index a80eead3dd3d..f8ef75ad8641 100644 --- a/src/test/librbd/mock/MockJournal.h +++ b/src/test/librbd/mock/MockJournal.h @@ -57,6 +57,13 @@ struct MockJournal { MOCK_METHOD2(commit_op_event, void(uint64_t, int)); MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *)); + + MOCK_METHOD2(add_listener, void(journal::ListenerType, + journal::JournalListenerPtr)); + MOCK_METHOD2(remove_listener, void(journal::ListenerType, + journal::JournalListenerPtr)); + + MOCK_METHOD1(check_resync_requested, int(bool *)); }; } // namespace librbd