]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: add tag handling to journal state machine
authorJason Dillaman <dillaman@redhat.com>
Fri, 26 Feb 2016 15:56:53 +0000 (10:56 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Mar 2016 14:03:43 +0000 (09:03 -0500)
The journal will not retrieve the tag class for the image within
the journal in addition to the most recently allocated tag. Also
added helper methods to allocate new tags.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Journal.cc
src/librbd/Journal.h

index af82c6cf5fb301accec5086b15f6d0a8e18f8d07..f9aff5bf23c1c6a84b778d82c6f43ccbb6db078a 100644 (file)
@@ -8,8 +8,8 @@
 #include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/journal/Replay.h"
-#include "librbd/journal/Types.h"
 #include "librbd/Utils.h"
+#include "cls/journal/cls_journal_types.h"
 #include "journal/Journaler.h"
 #include "journal/ReplayEntry.h"
 #include "common/errno.h"
 
 namespace librbd {
 
+namespace {
+
+struct C_DecodeTag : public Context {
+  CephContext *cct;
+  Mutex *lock;
+  uint64_t *tag_tid;
+  journal::TagData *tag_data;
+  Context *on_finish;
+
+  cls::journal::Tag tag;
+
+  C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+              journal::TagData *tag_data, Context *on_finish)
+    : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+      on_finish(on_finish) {
+  }
+
+  virtual void complete(int r) override {
+    on_finish->complete(process(r));
+    Context::complete(0);
+  }
+  virtual void finish(int r) override {
+  }
+
+  int process(int r) {
+    if (r < 0) {
+      lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
+      return r;
+    }
+
+    Mutex::Locker locker(*lock);
+    *tag_tid = tag.tid;
+
+    bufferlist::iterator data_it = tag.data.begin();
+    r = decode(&data_it, tag_data);
+    if (r < 0) {
+      lderr(cct) << "failed to decode allocated tag" << dendl;
+      return r;
+    }
+    return 0;
+  }
+
+  static int decode(bufferlist::iterator *it,
+                    journal::TagData *tag_data) {
+    try {
+      ::decode(*tag_data, *it);
+    } catch (const buffer::error &err) {
+      return -EBADMSG;
+    }
+    return 0;
+  }
+
+};
+
+struct C_DecodeTags : public Context {
+  CephContext *cct;
+  Mutex *lock;
+  uint64_t *tag_tid;
+  journal::TagData *tag_data;
+  Context *on_finish;
+
+  ::journal::Journaler::Tags tags;
+
+  C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+               journal::TagData *tag_data, Context *on_finish)
+    : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+      on_finish(on_finish) {
+  }
+
+  virtual void complete(int r) {
+    on_finish->complete(process(r));
+    Context::complete(0);
+  }
+  virtual void finish(int r) override {
+  }
+
+  int process(int r) {
+    if (r < 0) {
+      lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r)
+                 << dendl;
+      return r;
+    }
+
+    if (tags.empty()) {
+      lderr(cct) << "no journal tags retrieved" << dendl;
+      return -ENOENT;
+    }
+
+    Mutex::Locker locker(*lock);
+    *tag_tid = tags.back().tid;
+
+    bufferlist::iterator data_it = tags.back().data.begin();
+    r = C_DecodeTag::decode(&data_it, tag_data);
+    if (r < 0) {
+      lderr(cct) << "failed to decode journal tag" << dendl;
+      return r;
+    }
+    return 0;
+  }
+};
+
+} // anonymous namespace
+
 using util::create_async_context_callback;
 using util::create_context_callback;
 
+// client id for local image
+template <typename I>
+const std::string Journal<I>::IMAGE_CLIENT_ID("");
+
+// mirror uuid to use for local images
+template <typename I>
+const std::string Journal<I>::LOCAL_MIRROR_UUID("");
+
 template <typename I>
 std::ostream &operator<<(std::ostream &os,
                          const typename Journal<I>::State &state) {
@@ -111,7 +222,8 @@ int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
     pool_id = data_io_ctx.get_id();
   }
 
-  Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
+  Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
+                      cct->_conf->rbd_journal_commit_age);
 
   int r = journaler.create(order, splay_width, pool_id);
   if (r < 0) {
@@ -150,7 +262,8 @@ int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
+  Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
+                      cct->_conf->rbd_journal_commit_age);
 
   bool journal_exists;
   int r = journaler.exists(&journal_exists);
@@ -185,7 +298,8 @@ int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
+  Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
+                      cct->_conf->rbd_journal_commit_age);
 
   C_SaferCond cond;
   journaler.init(&cond);
@@ -281,6 +395,38 @@ void Journal<I>::close(Context *on_finish) {
   wait_for_steady_state(on_finish);
 }
 
+template <typename I>
+bool Journal<I>::is_tag_owner() const {
+  return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
+}
+
+template <typename I>
+void Journal<I>::allocate_tag(const std::string &mirror_uuid,
+                              Context *on_finish) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ":  mirror_uuid=" << mirror_uuid
+                 << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_journaler != nullptr && is_tag_owner());
+
+  // NOTE: currently responsibility of caller to provide local mirror
+  // uuid constant or remote peer uuid
+  journal::TagData tag_data;
+  tag_data.mirror_uuid = mirror_uuid;
+
+  // TODO: inject current commit position into tag data (need updated journaler PR)
+  tag_data.predecessor_mirror_uuid = m_tag_data.mirror_uuid;
+
+  bufferlist tag_bl;
+  ::encode(tag_data, tag_bl);
+
+  C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
+                                                &m_tag_data, on_finish);
+  m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
+                            decode_tag_ctx);
+}
+
 template <typename I>
 void Journal<I>::flush_commit_position(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
@@ -312,8 +458,7 @@ uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
     tid = ++m_event_tid;
     assert(tid != 0);
 
-    // TODO: use allocated tag_id
-    future = m_journaler->append(0, bl);
+    future = m_journaler->append(m_tag_tid, bl);
     m_events[tid] = Event(future, aio_comp, requests, offset, length);
   }
 
@@ -398,8 +543,7 @@ void Journal<I>::append_op_event(uint64_t op_tid,
     Mutex::Locker locker(m_lock);
     assert(m_state == STATE_READY);
 
-    // TODO: use allocated tag_id
-    future = m_journaler->append(0, bl);
+    future = m_journaler->append(m_tag_tid, bl);
 
     // delay committing op event to ensure consistent replay
     assert(m_op_futures.count(op_tid) == 0);
@@ -438,8 +582,7 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r) {
     op_start_future = it->second;
     m_op_futures.erase(it);
 
-    // TODO: use allocated tag_id
-    op_finish_future = m_journaler->append(0, bl);
+    op_finish_future = m_journaler->append(m_tag_tid, bl);
   }
 
   op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
@@ -553,8 +696,8 @@ void Journal<I>::create_journaler() {
   assert(m_journaler == NULL);
 
   transition_state(STATE_INITIALIZING, 0);
-  m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
-                              m_image_ctx.journal_commit_age);
+  m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id,
+                              IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age);
   m_journaler->init(create_async_context_callback(
     m_image_ctx, create_context_callback<
       Journal<I>, &Journal<I>::handle_initialized>(this)));
@@ -625,6 +768,7 @@ void Journal<I>::handle_initialized(int r) {
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
   Mutex::Locker locker(m_lock);
+  assert(m_state == STATE_INITIALIZING);
 
   if (r < 0) {
     lderr(cct) << this << " " << __func__
@@ -634,6 +778,56 @@ void Journal<I>::handle_initialized(int r) {
     return;
   }
 
+  // locate the master image client record
+  cls::journal::Client client;
+  r = m_journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID,
+                                     &client);
+  if (r < 0) {
+    lderr(cct) << "failed to locate master image client" << dendl;
+    destroy_journaler(r);
+    return;
+  }
+
+  librbd::journal::ClientData client_data;
+  bufferlist::iterator bl = client.data.begin();
+  try {
+    ::decode(client_data, bl);
+  } catch (const buffer::error &err) {
+    lderr(cct) << "failed to decode client meta data: " << err.what()
+               << dendl;
+    destroy_journaler(-EINVAL);
+    return;
+  }
+
+  librbd::journal::ImageClientMeta *image_client_meta =
+    boost::get<librbd::journal::ImageClientMeta>(&client_data.client_meta);
+  if (image_client_meta == nullptr) {
+    lderr(cct) << "failed to extract client meta data" << dendl;
+    destroy_journaler(-EINVAL);
+    return;
+  }
+
+  m_tag_class = image_client_meta->tag_class;
+  C_DecodeTags *tags_ctx = new C_DecodeTags(
+    cct, &m_lock, &m_tag_tid, &m_tag_data, create_async_context_callback(
+      m_image_ctx, create_context_callback<
+        Journal<I>, &Journal<I>::handle_get_tags>(this)));
+  m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx);
+}
+
+template <typename I>
+void Journal<I>::handle_get_tags(int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_state == STATE_INITIALIZING);
+
+  if (r < 0) {
+    destroy_journaler(r);
+    return;
+  }
+
   transition_state(STATE_REPLAYING, 0);
   m_journal_replay = journal::Replay<I>::create(m_image_ctx);
   m_journaler->start_replay(&m_replay_handler);
index a9f376763f0f0049b786e3a62e485f1607cc7112..127569ea740edef9b5a57aed5b8401e6b49f3eff 100644 (file)
@@ -13,6 +13,7 @@
 #include "journal/Future.h"
 #include "journal/ReplayEntry.h"
 #include "journal/ReplayHandler.h"
+#include "librbd/journal/Types.h"
 #include <algorithm>
 #include <iosfwd>
 #include <list>
@@ -32,7 +33,6 @@ class ImageCtx;
 
 namespace journal {
 
-class EventEntry;
 template <typename> class Replay;
 
 template <typename ImageCtxT>
@@ -92,6 +92,9 @@ public:
     STATE_CLOSED
   };
 
+  static const std::string IMAGE_CLIENT_ID;
+  static const std::string LOCAL_MIRROR_UUID;
+
   typedef std::list<AioObjectRequest *> AioObjectRequests;
 
   Journal(ImageCtxT &image_ctx);
@@ -112,6 +115,9 @@ public:
   void open(Context *on_finish);
   void close(Context *on_finish);
 
+  bool is_tag_owner() const;
+  void allocate_tag(const std::string &mirror_uuid, Context *on_finish);
+
   void flush_commit_position(Context *on_finish);
 
   uint64_t append_io_event(AioCompletion *aio_comp,
@@ -241,6 +247,9 @@ private:
   Journaler *m_journaler;
   mutable Mutex m_lock;
   State m_state;
+  uint64_t m_tag_class = 0;
+  uint64_t m_tag_tid = 0;
+  journal::TagData m_tag_data;
 
   int m_error_result;
   Contexts m_wait_for_state_contexts;
@@ -268,6 +277,7 @@ private:
   void complete_event(typename Events::iterator it, int r);
 
   void handle_initialized(int r);
+  void handle_get_tags(int r);
 
   void handle_replay_ready();
   void handle_replay_complete(int r);