From 0a8a6126ea35344e85af7eb64ffc490938edba51 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Mon, 16 May 2016 18:08:35 -0400 Subject: [PATCH] journal: helper method to detect newer tags Signed-off-by: Jason Dillaman --- src/journal/JournalMetadata.cc | 70 ++++++++++++++++++++++++ src/journal/JournalMetadata.h | 2 + src/test/journal/test_JournalMetadata.cc | 29 ++++++++++ 3 files changed, 101 insertions(+) diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index a05aa5a907ce0..c24a96aa0ef2b 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -336,6 +336,66 @@ struct C_FlushCommitPosition : public Context { } }; +struct C_AssertActiveTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + std::string client_id; + uint64_t tag_tid; + Context *on_finish; + + bufferlist out_bl; + + C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx, + const std::string &oid, AsyncOpTracker &async_op_tracker, + const std::string &client_id, uint64_t tag_tid, + Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) { + async_op_tracker.start_op(); + } + virtual ~C_AssertActiveTag() { + async_op_tracker.finish_op(); + } + + void send() { + ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::tag_list_start(&op, tag_tid, 2, client_id, boost::none); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, nullptr, &utils::rados_state_callback< + C_AssertActiveTag, &C_AssertActiveTag::handle_send>); + + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + assert(r == 0); + comp->release(); + } + + void handle_send(int r) { + ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl; + + std::set tags; + if (r == 0) { + bufferlist::iterator it = out_bl.begin(); + r = client::tag_list_finish(&it, &tags); + } + + // NOTE: since 0 is treated as an uninitialized list filter, we need to + // load to entries and look at the last tid + if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) { + r = -ESTALE; + } + complete(r); + } + + virtual void finish(int r) { + on_finish->complete(r); + } +}; + } // anonymous namespace JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, @@ -564,6 +624,16 @@ void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { m_active_set = object_set; } +void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { + Mutex::Locker locker(m_lock); + + C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid, + m_async_op_tracker, + m_client_id, tag_tid, + on_finish); + ctx->send(); +} + void JournalMetadata::flush_commit_position() { ldout(m_cct, 20) << __func__ << dendl; diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 9d19af69f1032..db30cf15951a3 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -117,6 +117,8 @@ public: return m_active_set; } + void assert_active_tag(uint64_t tag_tid, Context *on_finish); + void flush_commit_position(); void flush_commit_position(Context *on_safe); void get_commit_position(ObjectSetPosition *commit_position) const { diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc index b0bb41c844c99..b8f559365f811 100644 --- a/src/test/journal/test_JournalMetadata.cc +++ b/src/test/journal/test_JournalMetadata.cc @@ -115,3 +115,32 @@ TEST_F(TestJournalMetadata, UpdateActiveObject) { ASSERT_EQ(123U, metadata1->get_active_set()); } + +TEST_F(TestJournalMetadata, AssertActiveTag) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid, "client1", "")); + + journal::JournalMetadataPtr metadata = create_metadata(oid, "client1"); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + C_SaferCond ctx1; + cls::journal::Tag tag1; + metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + metadata->assert_active_tag(tag1.tid, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + + C_SaferCond ctx3; + cls::journal::Tag tag2; + metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx3); + ASSERT_EQ(0, ctx3.wait()); + + C_SaferCond ctx4; + metadata->assert_active_tag(tag1.tid, &ctx4); + ASSERT_EQ(-ESTALE, ctx4.wait()); +} -- 2.39.5