return false;
}
+struct C_AllocateTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ uint64_t tag_class;
+ Tag *tag;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx,
+ const std::string &oid, AsyncOpTracker &async_op_tracker,
+ uint64_t tag_class, const bufferlist &data, Tag *tag,
+ Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ tag_class(tag_class), tag(tag), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ tag->data = data;
+ }
+ virtual ~C_AllocateTag() {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_get_next_tag_tid();
+ }
+
+ void send_get_next_tag_tid() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_next_tag_tid_start(&op);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_next_tag_tid(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ bufferlist::iterator iter = out_bl.begin();
+ r = client::get_next_tag_tid_finish(&iter, &tag->tid);
+ }
+ if (r < 0) {
+ complete(r);
+ return;
+ }
+ send_tag_create();
+ }
+
+ void send_tag_create() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::tag_create(&op, tag->tid, tag_class, tag->data);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_tag_create>);
+
+ int r = ioctx.aio_operate(oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+ }
+
+ void handle_tag_create(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == -ESTALE) {
+ send_get_next_tag_tid();
+ return;
+ } else if (r < 0) {
+ complete(r);
+ return;
+ }
+
+ send_get_tag();
+ }
+
+ void send_get_tag() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_tag_start(&op, tag->tid);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_get_tag>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_tag(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ bufferlist::iterator iter = out_bl.begin();
+
+ cls::journal::Tag journal_tag;
+ r = client::get_tag_finish(&iter, &journal_tag);
+ if (r == 0) {
+ *tag = journal_tag;
+ }
+ }
+ complete(r);
+ }
+
+ virtual void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_GetTags : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ const std::string &client_id;
+ AsyncOpTracker &async_op_tracker;
+ boost::optional<uint64_t> tag_class;
+ JournalMetadata::Tags *tags;
+ Context *on_finish;
+
+ const uint64_t MAX_RETURN = 64;
+ uint64_t start_after_tag_tid = 0;
+ bufferlist out_bl;
+
+ C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, AsyncOpTracker &async_op_tracker,
+ const boost::optional<uint64_t> &tag_class,
+ JournalMetadata::Tags *tags, Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id),
+ async_op_tracker(async_op_tracker), tag_class(tag_class), tags(tags),
+ on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ virtual ~C_GetTags() {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_tag_list();
+ }
+
+ void send_tag_list() {
+ librados::ObjectReadOperation op;
+ client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id,
+ tag_class);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_GetTags, &C_GetTags::handle_tag_list>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ assert(r == 0);
+ comp->release();
+ }
+
+ void handle_tag_list(int r) {
+ if (r == 0) {
+ std::set<cls::journal::Tag> journal_tags;
+ bufferlist::iterator iter = out_bl.begin();
+ r = client::tag_list_finish(&iter, &journal_tags);
+ if (r == 0) {
+ for (auto &journal_tag : journal_tags) {
+ tags->push_back(journal_tag);
+ start_after_tag_tid = journal_tag.tid;
+ }
+
+ if (journal_tags.size() == MAX_RETURN) {
+ send_tag_list();
+ return;
+ }
+ }
+ }
+ complete(r);
+ }
+
+ virtual void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
} // anonymous namespace
JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
return 0;
}
+void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
+ Tag *tag, Context *on_finish) {
+ C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid,
+ m_async_op_tracker, tag_class,
+ data, tag, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::get_tags(const boost::optional<uint64_t> &tag_class,
+ Tags *tags, Context *on_finish) {
+ C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id,
+ m_async_op_tracker, tag_class,
+ tags, on_finish);
+ ctx->send();
+}
+
void JournalMetadata::add_listener(Listener *listener) {
Mutex::Locker locker(m_lock);
while (m_update_notifications > 0) {
ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2"));
}
+TEST_F(TestJournaler, AllocateTag) {
+ ASSERT_EQ(0, create_journal(12, 8));
+
+ cls::journal::Tag tag;
+
+ bufferlist data;
+ data.append(std::string(128, '1'));
+
+ // allocate a new tag class
+ C_SaferCond ctx1;
+ m_journaler->allocate_tag(data, &tag, &ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+ ASSERT_EQ(cls::journal::Tag(0, 0, data), tag);
+
+ // re-use an existing tag class
+ C_SaferCond ctx2;
+ m_journaler->allocate_tag(tag.tag_class, bufferlist(), &tag, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(cls::journal::Tag(1, 0, bufferlist()), tag);
+}
+
+TEST_F(TestJournaler, GetTags) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+
+ std::list<cls::journal::Tag> expected_tags;
+ for (size_t i = 0; i < 256; ++i) {
+ C_SaferCond ctx;
+ cls::journal::Tag tag;
+ if (i < 2) {
+ m_journaler->allocate_tag(bufferlist(), &tag, &ctx);
+ } else {
+ m_journaler->allocate_tag(i % 2, bufferlist(), &tag, &ctx);
+ }
+ ASSERT_EQ(0, ctx.wait());
+
+ if (i % 2 == 0) {
+ expected_tags.push_back(tag);
+ }
+ }
+
+ std::list<cls::journal::Tag> tags;
+ C_SaferCond ctx;
+ m_journaler->get_tags(0, &tags, &ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_EQ(expected_tags, tags);
+}