From: Jason Dillaman Date: Thu, 4 Feb 2016 21:27:32 +0000 (-0500) Subject: journal: added tag support methods X-Git-Tag: v10.0.4~28^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=608947cb3f4d31cef42d23d492257073a0748a7f;p=ceph.git journal: added tag support methods librbd, for example, will allocate a new tag after acquiring the exclusive lock. Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index 629bddc60f03..f8d55742892e 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -49,6 +49,200 @@ inline bool entry_positions_less_equal(const ObjectSetPosition &lhs, return false; } +struct C_AllocateTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + uint64_t tag_class; + Tag *tag; + Context *on_finish; + + bufferlist out_bl; + + C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx, + const std::string &oid, AsyncOpTracker &async_op_tracker, + uint64_t tag_class, const bufferlist &data, Tag *tag, + Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + tag_class(tag_class), tag(tag), on_finish(on_finish) { + async_op_tracker.start_op(); + tag->data = data; + } + virtual ~C_AllocateTag() { + async_op_tracker.finish_op(); + } + + void send() { + send_get_next_tag_tid(); + } + + void send_get_next_tag_tid() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_next_tag_tid_start(&op); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, nullptr, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + assert(r == 0); + comp->release(); + } + + void handle_get_next_tag_tid(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + bufferlist::iterator iter = out_bl.begin(); + r = client::get_next_tag_tid_finish(&iter, &tag->tid); + } + if (r < 0) { + complete(r); + return; + } + send_tag_create(); + } + + void send_tag_create() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectWriteOperation op; + client::tag_create(&op, tag->tid, tag_class, tag->data); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, nullptr, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_tag_create>); + + int r = ioctx.aio_operate(oid, comp, &op); + assert(r == 0); + comp->release(); + } + + void handle_tag_create(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == -ESTALE) { + send_get_next_tag_tid(); + return; + } else if (r < 0) { + complete(r); + return; + } + + send_get_tag(); + } + + void send_get_tag() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_tag_start(&op, tag->tid); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, nullptr, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_get_tag>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + assert(r == 0); + comp->release(); + } + + void handle_get_tag(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + bufferlist::iterator iter = out_bl.begin(); + + cls::journal::Tag journal_tag; + r = client::get_tag_finish(&iter, &journal_tag); + if (r == 0) { + *tag = journal_tag; + } + } + complete(r); + } + + virtual void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_GetTags : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + const std::string &client_id; + AsyncOpTracker &async_op_tracker; + boost::optional tag_class; + JournalMetadata::Tags *tags; + Context *on_finish; + + const uint64_t MAX_RETURN = 64; + uint64_t start_after_tag_tid = 0; + bufferlist out_bl; + + C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id, AsyncOpTracker &async_op_tracker, + const boost::optional &tag_class, + JournalMetadata::Tags *tags, Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id), + async_op_tracker(async_op_tracker), tag_class(tag_class), tags(tags), + on_finish(on_finish) { + async_op_tracker.start_op(); + } + virtual ~C_GetTags() { + async_op_tracker.finish_op(); + } + + void send() { + send_tag_list(); + } + + void send_tag_list() { + librados::ObjectReadOperation op; + client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id, + tag_class); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, nullptr, &utils::rados_state_callback< + C_GetTags, &C_GetTags::handle_tag_list>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + assert(r == 0); + comp->release(); + } + + void handle_tag_list(int r) { + if (r == 0) { + std::set journal_tags; + bufferlist::iterator iter = out_bl.begin(); + r = client::tag_list_finish(&iter, &journal_tags); + if (r == 0) { + for (auto &journal_tag : journal_tags) { + tags->push_back(journal_tag); + start_after_tag_tid = journal_tag.tid; + } + + if (journal_tags.size() == MAX_RETURN) { + send_tag_list(); + return; + } + } + } + complete(r); + } + + virtual void finish(int r) override { + on_finish->complete(r); + } +}; + } // anonymous namespace JournalMetadata::JournalMetadata(librados::IoCtx &ioctx, @@ -161,6 +355,22 @@ int JournalMetadata::unregister_client() { return 0; } +void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data, + Tag *tag, Context *on_finish) { + C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid, + m_async_op_tracker, tag_class, + data, tag, on_finish); + ctx->send(); +} + +void JournalMetadata::get_tags(const boost::optional &tag_class, + Tags *tags, Context *on_finish) { + C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id, + m_async_op_tracker, tag_class, + tags, on_finish); + ctx->send(); +} + void JournalMetadata::add_listener(Listener *listener) { Mutex::Locker locker(m_lock); while (m_update_notifications > 0) { diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 32d58bb421ae..450169b61d6e 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -14,6 +14,7 @@ #include "journal/AsyncOpTracker.h" #include #include +#include #include #include #include @@ -33,8 +34,10 @@ public: typedef cls::journal::EntryPositions EntryPositions; typedef cls::journal::ObjectSetPosition ObjectSetPosition; typedef cls::journal::Client Client; + typedef cls::journal::Tag Tag; typedef std::set RegisteredClients; + typedef std::list Tags; struct Listener { virtual ~Listener() {}; @@ -54,6 +57,11 @@ public: int register_client(const bufferlist &data); int unregister_client(); + void allocate_tag(uint64_t tag_class, const bufferlist &data, + Tag *tag, Context *on_finish); + void get_tags(const boost::optional &tag_class, Tags *tags, + Context *on_finish); + inline const std::string &get_client_id() const { return m_client_id; } diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index eafa5a8ae8b0..1eb7e2c2cb6d 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -167,6 +167,21 @@ int Journaler::unregister_client() { return m_metadata->unregister_client(); } +void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag, + Context *on_finish) { + m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag, + on_finish); +} + +void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data, + cls::journal::Tag *tag, Context *on_finish) { + m_metadata->allocate_tag(tag_class, data, tag, on_finish); +} + +void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) { + m_metadata->get_tags(tag_class, tags, on_finish); +} + void Journaler::start_replay(ReplayHandler *replay_handler) { create_player(replay_handler); m_player->prefetch(); diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 09127b79e549..6702eb4dffb4 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -9,8 +9,10 @@ #include "include/Context.h" #include "include/rados/librados.hpp" #include "journal/Future.h" -#include +#include "cls/journal/cls_journal_types.h" +#include #include +#include #include "include/assert.h" class SafeTimer; @@ -26,6 +28,7 @@ class ReplayHandler; class Journaler { public: + typedef std::list Tags; static std::string header_oid(const std::string &journal_id); static std::string object_oid_prefix(int pool_id, @@ -45,6 +48,12 @@ public: int register_client(const bufferlist &data); int unregister_client(); + void allocate_tag(const bufferlist &data, cls::journal::Tag *tag, + Context *on_finish); + void allocate_tag(uint64_t tag_class, const bufferlist &data, + cls::journal::Tag *tag, Context *on_finish); + void get_tags(uint64_t tag_class, Tags *tags, Context *on_finish); + void start_replay(ReplayHandler *replay_handler); void start_live_replay(ReplayHandler *replay_handler, double interval); bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr); diff --git a/src/journal/Utils.h b/src/journal/Utils.h index 1169ac924700..e29f359acaed 100644 --- a/src/journal/Utils.h +++ b/src/journal/Utils.h @@ -11,6 +11,13 @@ namespace journal { namespace utils { +template +void rados_state_callback(rados_completion_t c, void *arg) { + T *obj = reinterpret_cast(arg); + int r = rados_aio_get_return_value(c); + (obj->*MF)(r); +} + std::string get_object_name(const std::string &prefix, uint64_t number); std::string unique_lock_name(const std::string &name, void *address); diff --git a/src/test/journal/test_Journaler.cc b/src/test/journal/test_Journaler.cc index f7231fa2c238..1d45bdcc6dfc 100644 --- a/src/test/journal/test_Journaler.cc +++ b/src/test/journal/test_Journaler.cc @@ -84,3 +84,50 @@ TEST_F(TestJournaler, RegisterClientDuplicate) { ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2")); } +TEST_F(TestJournaler, AllocateTag) { + ASSERT_EQ(0, create_journal(12, 8)); + + cls::journal::Tag tag; + + bufferlist data; + data.append(std::string(128, '1')); + + // allocate a new tag class + C_SaferCond ctx1; + m_journaler->allocate_tag(data, &tag, &ctx1); + ASSERT_EQ(0, ctx1.wait()); + ASSERT_EQ(cls::journal::Tag(0, 0, data), tag); + + // re-use an existing tag class + C_SaferCond ctx2; + m_journaler->allocate_tag(tag.tag_class, bufferlist(), &tag, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + ASSERT_EQ(cls::journal::Tag(1, 0, bufferlist()), tag); +} + +TEST_F(TestJournaler, GetTags) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(0, register_client(CLIENT_ID, "foo")); + + std::list expected_tags; + for (size_t i = 0; i < 256; ++i) { + C_SaferCond ctx; + cls::journal::Tag tag; + if (i < 2) { + m_journaler->allocate_tag(bufferlist(), &tag, &ctx); + } else { + m_journaler->allocate_tag(i % 2, bufferlist(), &tag, &ctx); + } + ASSERT_EQ(0, ctx.wait()); + + if (i % 2 == 0) { + expected_tags.push_back(tag); + } + } + + std::list tags; + C_SaferCond ctx; + m_journaler->get_tags(0, &tags, &ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_EQ(expected_tags, tags); +}