]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: added tag support methods
authorJason Dillaman <dillaman@redhat.com>
Thu, 4 Feb 2016 21:27:32 +0000 (16:27 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 5 Feb 2016 20:21:28 +0000 (15:21 -0500)
librbd, for example, will allocate a new tag after acquiring
the exclusive lock.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/Journaler.cc
src/journal/Journaler.h
src/journal/Utils.h
src/test/journal/test_Journaler.cc

index 629bddc60f031feb87e72387c13de4220c23298a..f8d55742892ef20cdccbac8fb2ffcd74d4c29618 100644 (file)
@@ -49,6 +49,200 @@ inline bool entry_positions_less_equal(const ObjectSetPosition &lhs,
   return false;
 }
 
+struct C_AllocateTag : public Context {
+  CephContext *cct;
+  librados::IoCtx &ioctx;
+  const std::string &oid;
+  AsyncOpTracker &async_op_tracker;
+  uint64_t tag_class;
+  Tag *tag;
+  Context *on_finish;
+
+  bufferlist out_bl;
+
+  C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx,
+                const std::string &oid, AsyncOpTracker &async_op_tracker,
+                uint64_t tag_class, const bufferlist &data, Tag *tag,
+                Context *on_finish)
+    : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+      tag_class(tag_class), tag(tag), on_finish(on_finish) {
+    async_op_tracker.start_op();
+    tag->data = data;
+  }
+  virtual ~C_AllocateTag() {
+    async_op_tracker.finish_op();
+  }
+
+  void send() {
+    send_get_next_tag_tid();
+  }
+
+  void send_get_next_tag_tid() {
+    ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+    librados::ObjectReadOperation op;
+    client::get_next_tag_tid_start(&op);
+
+    librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+      this, nullptr, &utils::rados_state_callback<
+        C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>);
+
+    out_bl.clear();
+    int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+    assert(r == 0);
+    comp->release();
+  }
+
+  void handle_get_next_tag_tid(int r) {
+    ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+    if (r == 0) {
+      bufferlist::iterator iter = out_bl.begin();
+      r = client::get_next_tag_tid_finish(&iter, &tag->tid);
+    }
+    if (r < 0) {
+      complete(r);
+      return;
+    }
+    send_tag_create();
+  }
+
+  void send_tag_create() {
+    ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+    librados::ObjectWriteOperation op;
+    client::tag_create(&op, tag->tid, tag_class, tag->data);
+
+    librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+      this, nullptr, &utils::rados_state_callback<
+        C_AllocateTag, &C_AllocateTag::handle_tag_create>);
+
+    int r = ioctx.aio_operate(oid, comp, &op);
+    assert(r == 0);
+    comp->release();
+  }
+
+  void handle_tag_create(int r) {
+    ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+    if (r == -ESTALE) {
+      send_get_next_tag_tid();
+      return;
+    } else if (r < 0) {
+      complete(r);
+      return;
+    }
+
+    send_get_tag();
+  }
+
+  void send_get_tag() {
+    ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+    librados::ObjectReadOperation op;
+    client::get_tag_start(&op, tag->tid);
+
+    librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+      this, nullptr, &utils::rados_state_callback<
+        C_AllocateTag, &C_AllocateTag::handle_get_tag>);
+
+    out_bl.clear();
+    int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+    assert(r == 0);
+    comp->release();
+  }
+
+  void handle_get_tag(int r) {
+    ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+    if (r == 0) {
+      bufferlist::iterator iter = out_bl.begin();
+
+      cls::journal::Tag journal_tag;
+      r = client::get_tag_finish(&iter, &journal_tag);
+      if (r == 0) {
+        *tag = journal_tag;
+      }
+    }
+    complete(r);
+  }
+
+  virtual void finish(int r) override {
+    on_finish->complete(r);
+  }
+};
+
+struct C_GetTags : public Context {
+  CephContext *cct;
+  librados::IoCtx &ioctx;
+  const std::string &oid;
+  const std::string &client_id;
+  AsyncOpTracker &async_op_tracker;
+  boost::optional<uint64_t> tag_class;
+  JournalMetadata::Tags *tags;
+  Context *on_finish;
+
+  const uint64_t MAX_RETURN = 64;
+  uint64_t start_after_tag_tid = 0;
+  bufferlist out_bl;
+
+  C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+            const std::string &client_id, AsyncOpTracker &async_op_tracker,
+            const boost::optional<uint64_t> &tag_class,
+            JournalMetadata::Tags *tags, Context *on_finish)
+    : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id),
+      async_op_tracker(async_op_tracker), tag_class(tag_class), tags(tags),
+      on_finish(on_finish) {
+    async_op_tracker.start_op();
+  }
+  virtual ~C_GetTags() {
+    async_op_tracker.finish_op();
+  }
+
+  void send() {
+    send_tag_list();
+  }
+
+  void send_tag_list() {
+    librados::ObjectReadOperation op;
+    client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id,
+                           tag_class);
+
+    librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+      this, nullptr, &utils::rados_state_callback<
+        C_GetTags, &C_GetTags::handle_tag_list>);
+
+    out_bl.clear();
+    int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+    assert(r == 0);
+    comp->release();
+  }
+
+  void handle_tag_list(int r) {
+    if (r == 0) {
+      std::set<cls::journal::Tag> journal_tags;
+      bufferlist::iterator iter = out_bl.begin();
+      r = client::tag_list_finish(&iter, &journal_tags);
+      if (r == 0) {
+        for (auto &journal_tag : journal_tags) {
+          tags->push_back(journal_tag);
+          start_after_tag_tid = journal_tag.tid;
+        }
+
+        if (journal_tags.size() == MAX_RETURN) {
+          send_tag_list();
+          return;
+        }
+      }
+    }
+    complete(r);
+  }
+
+  virtual void finish(int r) override {
+    on_finish->complete(r);
+  }
+};
+
 } // anonymous namespace
 
 JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
@@ -161,6 +355,22 @@ int JournalMetadata::unregister_client() {
   return 0;
 }
 
+void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
+                                   Tag *tag, Context *on_finish) {
+  C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid,
+                                         m_async_op_tracker, tag_class,
+                                         data, tag, on_finish);
+  ctx->send();
+}
+
+void JournalMetadata::get_tags(const boost::optional<uint64_t> &tag_class,
+                               Tags *tags, Context *on_finish) {
+  C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id,
+                                 m_async_op_tracker, tag_class,
+                                 tags, on_finish);
+  ctx->send();
+}
+
 void JournalMetadata::add_listener(Listener *listener) {
   Mutex::Locker locker(m_lock);
   while (m_update_notifications > 0) {
index 32d58bb421ae08f343b88fea385c31f3706768e8..450169b61d6e16bb10d1b57822bba4321dd745c7 100644 (file)
@@ -14,6 +14,7 @@
 #include "journal/AsyncOpTracker.h"
 #include <boost/intrusive_ptr.hpp>
 #include <boost/noncopyable.hpp>
+#include <boost/optional.hpp>
 #include <list>
 #include <map>
 #include <string>
@@ -33,8 +34,10 @@ public:
   typedef cls::journal::EntryPositions EntryPositions;
   typedef cls::journal::ObjectSetPosition ObjectSetPosition;
   typedef cls::journal::Client Client;
+  typedef cls::journal::Tag Tag;
 
   typedef std::set<Client> RegisteredClients;
+  typedef std::list<Tag> Tags;
 
   struct Listener {
     virtual ~Listener() {};
@@ -54,6 +57,11 @@ public:
   int register_client(const bufferlist &data);
   int unregister_client();
 
+  void allocate_tag(uint64_t tag_class, const bufferlist &data,
+                    Tag *tag, Context *on_finish);
+  void get_tags(const boost::optional<uint64_t> &tag_class, Tags *tags,
+                Context *on_finish);
+
   inline const std::string &get_client_id() const {
     return m_client_id;
   }
index eafa5a8ae8b0436092d45b7c4f16807329791d8e..1eb7e2c2cb6d07ccc2c516f123cb6fdd4cffcc72 100644 (file)
@@ -167,6 +167,21 @@ int Journaler::unregister_client() {
   return m_metadata->unregister_client();
 }
 
+void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
+                             Context *on_finish) {
+  m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag,
+                           on_finish);
+}
+
+void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data,
+                             cls::journal::Tag *tag, Context *on_finish) {
+  m_metadata->allocate_tag(tag_class, data, tag, on_finish);
+}
+
+void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) {
+  m_metadata->get_tags(tag_class, tags, on_finish);
+}
+
 void Journaler::start_replay(ReplayHandler *replay_handler) {
   create_player(replay_handler);
   m_player->prefetch();
index 09127b79e54936f1d3c93d779a692ecb517e6933..6702eb4dffb4b2cbdb839975a2165c8f6747c361 100644 (file)
@@ -9,8 +9,10 @@
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
 #include "journal/Future.h"
-#include <string>
+#include "cls/journal/cls_journal_types.h"
+#include <list>
 #include <map>
+#include <string>
 #include "include/assert.h"
 
 class SafeTimer;
@@ -26,6 +28,7 @@ class ReplayHandler;
 
 class Journaler {
 public:
+  typedef std::list<cls::journal::Tag> Tags;
 
   static std::string header_oid(const std::string &journal_id);
   static std::string object_oid_prefix(int pool_id,
@@ -45,6 +48,12 @@ public:
   int register_client(const bufferlist &data);
   int unregister_client();
 
+  void allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
+                    Context *on_finish);
+  void allocate_tag(uint64_t tag_class, const bufferlist &data,
+                    cls::journal::Tag *tag, Context *on_finish);
+  void get_tags(uint64_t tag_class, Tags *tags, Context *on_finish);
+
   void start_replay(ReplayHandler *replay_handler);
   void start_live_replay(ReplayHandler *replay_handler, double interval);
   bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr);
index 1169ac924700022180df1c71c450351541eec56d..e29f359acaedac3490099779deb20996e91cd03a 100644 (file)
 namespace journal {
 namespace utils {
 
+template <typename T, void(T::*MF)(int)>
+void rados_state_callback(rados_completion_t c, void *arg) {
+  T *obj = reinterpret_cast<T*>(arg);
+  int r = rados_aio_get_return_value(c);
+  (obj->*MF)(r);
+}
+
 std::string get_object_name(const std::string &prefix, uint64_t number);
 
 std::string unique_lock_name(const std::string &name, void *address);
index f7231fa2c238ff5f26af3ccc5245f7ca38accf00..1d45bdcc6dfc966d6ab1763b7d2616f365625564 100644 (file)
@@ -84,3 +84,50 @@ TEST_F(TestJournaler, RegisterClientDuplicate) {
   ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2"));
 }
 
+TEST_F(TestJournaler, AllocateTag) {
+  ASSERT_EQ(0, create_journal(12, 8));
+
+  cls::journal::Tag tag;
+
+  bufferlist data;
+  data.append(std::string(128, '1'));
+
+  // allocate a new tag class
+  C_SaferCond ctx1;
+  m_journaler->allocate_tag(data, &tag, &ctx1);
+  ASSERT_EQ(0, ctx1.wait());
+  ASSERT_EQ(cls::journal::Tag(0, 0, data), tag);
+
+  // re-use an existing tag class
+  C_SaferCond ctx2;
+  m_journaler->allocate_tag(tag.tag_class, bufferlist(), &tag, &ctx2);
+  ASSERT_EQ(0, ctx2.wait());
+  ASSERT_EQ(cls::journal::Tag(1, 0, bufferlist()), tag);
+}
+
+TEST_F(TestJournaler, GetTags) {
+  ASSERT_EQ(0, create_journal(12, 8));
+  ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+
+  std::list<cls::journal::Tag> expected_tags;
+  for (size_t i = 0; i < 256; ++i) {
+    C_SaferCond ctx;
+    cls::journal::Tag tag;
+    if (i < 2) {
+      m_journaler->allocate_tag(bufferlist(), &tag, &ctx);
+    } else {
+      m_journaler->allocate_tag(i % 2, bufferlist(), &tag, &ctx);
+    }
+    ASSERT_EQ(0, ctx.wait());
+
+    if (i % 2 == 0) {
+      expected_tags.push_back(tag);
+    }
+  }
+
+  std::list<cls::journal::Tag> tags;
+  C_SaferCond ctx;
+  m_journaler->get_tags(0, &tags, &ctx);
+  ASSERT_EQ(0, ctx.wait());
+  ASSERT_EQ(expected_tags, tags);
+}