]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd: journal: Support for listening updates on client metadata
authorRicardo Dias <rdias@suse.com>
Thu, 12 May 2016 17:10:38 +0000 (18:10 +0100)
committerJason Dillaman <dillaman@redhat.com>
Thu, 11 Aug 2016 15:28:19 +0000 (11:28 -0400)
Currently we only support listening for image resync requests.

Signed-off-by: Ricardo Dias <rdias@suse.com>
(cherry picked from commit 0dd85739e9034912b86250ced2834dd7fb3d92fd)

src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/journal/Types.h
src/test/librbd/mock/MockJournal.h

index e9a6bd03d90df7c0c73dabdfd1fdef158b0af7dd..96f9321e614d730d131f6ac7a315fed369935099 100644 (file)
@@ -300,7 +300,8 @@ Journal<I>::Journal(I &image_ctx)
     m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
     m_error_result(0), m_replay_handler(this), m_close_pending(false),
     m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
-    m_blocking_writes(false), m_journal_replay(NULL) {
+    m_blocking_writes(false), m_journal_replay(NULL),
+    m_metadata_listener(this) {
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
@@ -1182,6 +1183,8 @@ void Journal<I>::destroy_journaler(int r) {
   delete m_journal_replay;
   m_journal_replay = NULL;
 
+  m_journaler->remove_listener(&m_metadata_listener);
+
   transition_state(STATE_CLOSING, r);
   m_journaler->shut_down(create_async_context_callback(
     m_image_ctx, create_context_callback<
@@ -1200,6 +1203,8 @@ void Journal<I>::recreate_journaler(int r) {
   delete m_journal_replay;
   m_journal_replay = NULL;
 
+  m_journaler->remove_listener(&m_metadata_listener);
+
   transition_state(STATE_RESTARTING_REPLAY, r);
   m_journaler->shut_down(create_async_context_callback(
     m_image_ctx, create_context_callback<
@@ -1301,6 +1306,8 @@ void Journal<I>::handle_initialized(int r) {
       m_image_ctx, create_context_callback<
         Journal<I>, &Journal<I>::handle_get_tags>(this)));
   m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx);
+
+  m_journaler->add_listener(&m_metadata_listener);
 }
 
 template <typename I>
@@ -1677,6 +1684,99 @@ void Journal<I>::wait_for_steady_state(Context *on_state) {
   m_wait_for_state_contexts.push_back(on_state);
 }
 
+template <typename I>
+int Journal<I>::check_resync_requested(bool *do_resync) {
+  Mutex::Locker l(m_lock);
+  return check_resync_requested_internal(do_resync);
+}
+
+template <typename I>
+int Journal<I>::check_resync_requested_internal(bool *do_resync) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  assert(m_lock.is_locked());
+  assert(do_resync != nullptr);
+
+  cls::journal::Client client;
+  int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
+  if (r < 0) {
+     lderr(cct) << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+     return r;
+  }
+
+  librbd::journal::ClientData client_data;
+  bufferlist::iterator bl_it = client.data.begin();
+  try {
+    ::decode(client_data, bl_it);
+  } catch (const buffer::error &err) {
+    lderr(cct) << "failed to decode client data: " << err << dendl;
+    return -EINVAL;
+  }
+
+  journal::ImageClientMeta *image_client_meta =
+    boost::get<journal::ImageClientMeta>(&client_data.client_meta);
+  if (image_client_meta == nullptr) {
+    lderr(cct) << "failed to access image client meta struct" << dendl;
+    return -EINVAL;
+  }
+
+  *do_resync = image_client_meta->resync_requested;
+
+  return 0;
+}
+
+template <typename I>
+void Journal<I>::handle_metadata_updated() {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << dendl;
+
+  std::list<journal::ResyncListener *> resync_private_list;
+
+  {
+    Mutex::Locker l(m_lock);
+
+    if (m_state == STATE_CLOSING || m_state == STATE_CLOSED ||
+        m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPING) {
+      return;
+    }
+
+    bool do_resync = false;
+    int r = check_resync_requested_internal(&do_resync);
+    if (r < 0) {
+      lderr(cct) << "failed to check if a resync was requested" << dendl;
+      return;
+    }
+
+    if (do_resync) {
+      for (const auto& listener :
+                              m_listener_map[journal::ListenerType::RESYNC]) {
+        journal::ResyncListener *rsync_listener =
+                        boost::get<journal::ResyncListener *>(listener);
+        resync_private_list.push_back(rsync_listener);
+      }
+    }
+  }
+
+  for (const auto& listener : resync_private_list) {
+    listener->handle_resync();
+  }
+}
+
+template <typename I>
+void Journal<I>::add_listener(journal::ListenerType type,
+                              journal::JournalListenerPtr listener) {
+  Mutex::Locker l(m_lock);
+  m_listener_map[type].push_back(listener);
+}
+
+template <typename I>
+void Journal<I>::remove_listener(journal::ListenerType type,
+                                 journal::JournalListenerPtr listener) {
+  Mutex::Locker l(m_lock);
+  m_listener_map[type].remove(listener);
+}
+
 } // namespace librbd
 
 template class librbd::Journal<librbd::ImageCtx>;
index 083aef564ae1eb31c9d2dc79171fbd061b60e1ce..7f085dfc6c4ab3314267a04d032d983996cb994c 100644 (file)
@@ -11,6 +11,7 @@
 #include "include/rados/librados.hpp"
 #include "common/Mutex.h"
 #include "journal/Future.h"
+#include "journal/JournalMetadataListener.h"
 #include "journal/ReplayEntry.h"
 #include "journal/ReplayHandler.h"
 #include "librbd/journal/Types.h"
@@ -158,6 +159,13 @@ public:
                              Context *on_start, Context *on_close_request);
   void stop_external_replay();
 
+  void add_listener(journal::ListenerType type,
+                    journal::JournalListenerPtr listener);
+  void remove_listener(journal::ListenerType type,
+                       journal::JournalListenerPtr listener);
+
+  int check_resync_requested(bool *do_resync);
+
 private:
   ImageCtxT &m_image_ctx;
 
@@ -288,6 +296,23 @@ private:
   journal::Replay<ImageCtxT> *m_journal_replay;
   Context *m_on_replay_close_request = nullptr;
 
+  struct MetadataListener : public ::journal::JournalMetadataListener {
+    Journal<ImageCtxT> *journal;
+
+    MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { }
+
+    void handle_update(::journal::JournalMetadata *) {
+      FunctionContext *ctx = new FunctionContext([this](int r) {
+        journal->handle_metadata_updated();
+      });
+      journal->m_work_queue->queue(ctx, 0);
+    }
+  } m_metadata_listener;
+
+  typedef std::map<journal::ListenerType,
+                   std::list<journal::JournalListenerPtr> > ListenerMap;
+  ListenerMap m_listener_map;
+
   uint64_t append_io_events(journal::EventType event_type,
                             const Bufferlists &bufferlists,
                             const AioObjectRequests &requests,
@@ -331,6 +356,10 @@ private:
 
   bool is_steady_state() const;
   void wait_for_steady_state(Context *on_state);
+
+  int check_resync_requested_internal(bool *do_resync);
+
+  void handle_metadata_updated();
 };
 
 } // namespace librbd
index 4008a0f15bfaf719adb5a1380ad067b5ad7b5de6..4748aeb2ebb938a20703a642c747ad6d7aefcad6 100644 (file)
@@ -483,6 +483,18 @@ std::ostream &operator<<(std::ostream &out, const MirrorPeerState &meta);
 std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta);
 std::ostream &operator<<(std::ostream &out, const TagData &tag_data);
 
+enum class ListenerType : int8_t {
+  RESYNC
+};
+
+struct ResyncListener {
+  virtual ~ResyncListener() {}
+  virtual void handle_resync() = 0;
+};
+
+typedef boost::variant<ResyncListener *> JournalListenerPtr;
+
+
 } // namespace journal
 } // namespace librbd
 
index a80eead3dd3db862f1ff590cc705dd0ab800a124..f8ef75ad86419e482850729f7ad0d30adec7ba80 100644 (file)
@@ -57,6 +57,13 @@ struct MockJournal {
 
   MOCK_METHOD2(commit_op_event, void(uint64_t, int));
   MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *));
+
+  MOCK_METHOD2(add_listener, void(journal::ListenerType,
+                                  journal::JournalListenerPtr));
+  MOCK_METHOD2(remove_listener, void(journal::ListenerType,
+                                     journal::JournalListenerPtr));
+
+  MOCK_METHOD1(check_resync_requested, int(bool *));
 };
 
 } // namespace librbd