}
};
+struct C_AssertActiveTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ std::string client_id;
+ uint64_t tag_tid;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx,
+ const std::string &oid, AsyncOpTracker &async_op_tracker,
+ const std::string &client_id, uint64_t tag_tid,
+ Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ virtual ~C_AssertActiveTag() {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::tag_list_start(&op, tag_tid, 2, client_id, boost::none);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AssertActiveTag, &C_AssertActiveTag::handle_send>);
+
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ assert(r == 0);
+ comp->release();
+ }
+
+ void handle_send(int r) {
+ ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl;
+
+ std::set<cls::journal::Tag> tags;
+ if (r == 0) {
+ bufferlist::iterator it = out_bl.begin();
+ r = client::tag_list_finish(&it, &tags);
+ }
+
+ // NOTE: since 0 is treated as an uninitialized list filter, we need to
+ // load to entries and look at the last tid
+ if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) {
+ r = -ESTALE;
+ }
+ complete(r);
+ }
+
+ virtual void finish(int r) {
+ on_finish->complete(r);
+ }
+};
+
} // anonymous namespace
JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
m_active_set = object_set;
}
+void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
+ Mutex::Locker locker(m_lock);
+
+ C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
+ m_async_op_tracker,
+ m_client_id, tag_tid,
+ on_finish);
+ ctx->send();
+}
+
void JournalMetadata::flush_commit_position() {
ldout(m_cct, 20) << __func__ << dendl;
ASSERT_EQ(123U, metadata1->get_active_set());
}
+
+TEST_F(TestJournalMetadata, AssertActiveTag) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid, "client1");
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ C_SaferCond ctx1;
+ cls::journal::Tag tag1;
+ metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ metadata->assert_active_tag(tag1.tid, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+
+ C_SaferCond ctx3;
+ cls::journal::Tag tag2;
+ metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+
+ C_SaferCond ctx4;
+ metadata->assert_active_tag(tag1.tid, &ctx4);
+ ASSERT_EQ(-ESTALE, ctx4.wait());
+}