template <typename I, typename J>
int open_journaler(I *image_ctx, J *journaler, bool *initialized,
+ cls::journal::Client *client,
journal::ImageClientMeta *client_meta,
journal::TagData *tag_data) {
C_SaferCond init_ctx;
return r;
}
- cls::journal::Client client;
- r = journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, &client);
+ r = journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client);
if (r < 0) {
return r;
}
librbd::journal::ClientData client_data;
- bufferlist::iterator bl_it = client.data.begin();
+ bufferlist::iterator bl_it = client->data.begin();
try {
::decode(client_data, bl_it);
} catch (const buffer::error &err) {
return 0;
}
+template <typename J>
+int allocate_journaler_tag(CephContext *cct, J *journaler,
+ const cls::journal::Client &client,
+ uint64_t tag_class,
+ const journal::TagData &prev_tag_data,
+ const std::string &mirror_uuid,
+ cls::journal::Tag *new_tag) {
+ journal::TagData tag_data;
+ if (!client.commit_position.object_positions.empty()) {
+ auto position = client.commit_position.object_positions.front();
+ tag_data.predecessor_commit_valid = true;
+ tag_data.predecessor_tag_tid = position.tag_tid;
+ tag_data.predecessor_entry_tid = position.entry_tid;
+ }
+ tag_data.predecessor_mirror_uuid = prev_tag_data.mirror_uuid;
+ tag_data.mirror_uuid = mirror_uuid;
+
+ bufferlist tag_bl;
+ ::encode(tag_data, tag_bl);
+
+ C_SaferCond allocate_tag_ctx;
+ journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
+
+ int r = allocate_tag_ctx.wait();
+ if (r < 0) {
+ lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ return 0;
+}
+
} // anonymous namespace
using util::create_async_context_callback;
return r;
}
- // create tag class for this image's journal events
- journal::TagData tag_data;
- tag_data.mirror_uuid = (!non_primary ? LOCAL_MIRROR_UUID :
- ORPHAN_MIRROR_UUID);
-
- bufferlist tag_data_bl;
- ::encode(tag_data, tag_data_bl);
-
- C_SaferCond tag_ctx;
+ cls::journal::Client client;
cls::journal::Tag tag;
- journaler.allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, tag_data_bl,
- &tag, &tag_ctx);
- r = tag_ctx.wait();
- if (r < 0) {
- lderr(cct) << "failed to allocate journal tag: " << cpp_strerror(r)
- << dendl;
- }
+ journal::TagData tag_data;
+ std::string mirror_uuid = (!non_primary ? LOCAL_MIRROR_UUID :
+ ORPHAN_MIRROR_UUID);
+ r = allocate_journaler_tag(cct, &journaler, client,
+ cls::journal::Tag::TAG_CLASS_NEW,
+ tag_data, mirror_uuid, &tag);
bufferlist client_data;
::encode(journal::ClientData{journal::ImageClientMeta{tag.tag_class}},
image_ctx->cct->_conf->rbd_journal_commit_age);
bool initialized;
+ cls::journal::Client client;
journal::ImageClientMeta client_meta;
journal::TagData tag_data;
- int r = open_journaler(image_ctx, &journaler, &initialized, &client_meta,
- &tag_data);
+ int r = open_journaler(image_ctx, &journaler, &initialized, &client,
+ &client_meta, &tag_data);
if (r >= 0) {
*mirror_uuid = tag_data.mirror_uuid;
}
}
template <typename I>
-int Journal<I>::allocate_tag(I *image_ctx, const std::string &mirror_uuid) {
+int Journal<I>::request_resync(I *image_ctx) {
CephContext *cct = image_ctx->cct;
- ldout(cct, 20) << __func__ << ": mirror_uuid=" << mirror_uuid << dendl;
+ ldout(cct, 20) << __func__ << dendl;
Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID,
image_ctx->cct->_conf->rbd_journal_commit_age);
bool initialized;
+ cls::journal::Client client;
journal::ImageClientMeta client_meta;
journal::TagData tag_data;
- int r = open_journaler(image_ctx, &journaler, &initialized, &client_meta,
- &tag_data);
+ int r = open_journaler(image_ctx, &journaler, &initialized, &client,
+ &client_meta, &tag_data);
BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) {
if (initialized) {
journaler.shut_down();
return r;
}
- cls::journal::Client client;
- r = journaler.get_cached_client(IMAGE_CLIENT_ID, &client);
- if (r < 0) {
- lderr(cct) << "failed to retrieve client" << cpp_strerror(r) << dendl;
- return r;
- }
-
- if (!client.commit_position.object_positions.empty()) {
- auto position = client.commit_position.object_positions.front();
- tag_data.predecessor_commit_valid = true;
- tag_data.predecessor_tag_tid = position.tag_tid;
- tag_data.predecessor_entry_tid = position.entry_tid;
- }
- tag_data.predecessor_mirror_uuid = tag_data.mirror_uuid;
- tag_data.mirror_uuid = mirror_uuid;
+ client_meta.resync_requested = true;
- bufferlist tag_bl;
- ::encode(tag_data, tag_bl);
+ journal::ClientData client_data(client_meta);
+ bufferlist client_data_bl;
+ ::encode(client_data, client_data_bl);
- C_SaferCond allocate_tag_ctx;
- cls::journal::Tag tag;
- journaler.allocate_tag(client_meta.tag_class, tag_bl, &tag,
- &allocate_tag_ctx);
+ C_SaferCond update_client_ctx;
+ journaler.update_client(client_data_bl, &update_client_ctx);
- r = allocate_tag_ctx.wait();
+ r = update_client_ctx.wait();
if (r < 0) {
- lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
+ lderr(cct) << "failed to update client: " << cpp_strerror(r) << dendl;
return r;
}
-
return 0;
}
template <typename I>
-int Journal<I>::request_resync(I *image_ctx) {
+int Journal<I>::promote(I *image_ctx) {
CephContext *cct = image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;
image_ctx->cct->_conf->rbd_journal_commit_age);
bool initialized;
+ cls::journal::Client client;
journal::ImageClientMeta client_meta;
journal::TagData tag_data;
- int r = open_journaler(image_ctx, &journaler, &initialized, &client_meta,
- &tag_data);
+ int r = open_journaler(image_ctx, &journaler, &initialized, &client,
+ &client_meta, &tag_data);
BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) {
if (initialized) {
journaler.shut_down();
return r;
}
- client_meta.resync_requested = true;
-
- journal::ClientData client_data(client_meta);
- bufferlist client_data_bl;
- ::encode(client_data, client_data_bl);
-
- C_SaferCond update_client_ctx;
- journaler.update_client(client_data_bl, &update_client_ctx);
-
- r = update_client_ctx.wait();
+ cls::journal::Tag new_tag;
+ r = allocate_journaler_tag(cct, &journaler, client, client_meta.tag_class,
+ tag_data, LOCAL_MIRROR_UUID, &new_tag);
if (r < 0) {
- lderr(cct) << "failed to update client: " << cpp_strerror(r) << dendl;
return r;
}
+
return 0;
}
return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
}
+template <typename I>
+int Journal<I>::demote() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_journaler != nullptr && is_tag_owner());
+
+ cls::journal::Client client;
+ int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
+ if (r < 0) {
+ lderr(cct) << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ cls::journal::Tag new_tag;
+ r = allocate_journaler_tag(cct, m_journaler, client, m_tag_class,
+ m_tag_data, ORPHAN_MIRROR_UUID, &new_tag);
+ if (r < 0) {
+ return r;
+ }
+
+ bufferlist::iterator tag_data_bl_it = new_tag.data.begin();
+ r = C_DecodeTag::decode(&tag_data_bl_it, &m_tag_data);
+ if (r < 0) {
+ lderr(cct) << "failed to decode newly allocated tag" << dendl;
+ return r;
+ }
+
+ journal::EventEntry event_entry{journal::DemoteEvent{}};
+ bufferlist event_entry_bl;
+ ::encode(event_entry, event_entry_bl);
+
+ m_tag_tid = new_tag.tid;
+ Future future = m_journaler->append(m_tag_tid, event_entry_bl);
+ C_SaferCond ctx;
+ future.flush(&ctx);
+
+ r = ctx.wait();
+ if (r < 0) {
+ lderr(cct) << "failed to append demotion journal event: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ m_journaler->committed(future);
+ C_SaferCond flush_ctx;
+ m_journaler->flush_commit_position(&flush_ctx);
+
+ r = flush_ctx.wait();
+ if (r < 0) {
+ lderr(cct) << "failed to flush demotion commit position: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
template <typename I>
void Journal<I>::allocate_local_tag(Context *on_finish) {
CephContext *cct = m_image_ctx.cct;