return r;
}
- // TODO: inject current commit position into tag data
+ cls::journal::Client client;
+ r = journaler.get_cached_client(IMAGE_CLIENT_ID, &client);
+ if (r < 0) {
+ lderr(cct) << "failed to retrieve client" << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ if (!client.commit_position.object_positions.empty()) {
+ auto position = client.commit_position.object_positions.front();
+ tag_data.predecessor_commit_valid = true;
+ tag_data.predecessor_tag_tid = position.tag_tid;
+ tag_data.predecessor_entry_tid = position.entry_tid;
+ }
+ tag_data.predecessor_mirror_uuid = tag_data.mirror_uuid;
tag_data.mirror_uuid = mirror_uuid;
- tag_data.predecessor_mirror_uuid = mirror_uuid;
bufferlist tag_bl;
::encode(tag_data, tag_bl);
return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
}
+template <typename I>
+void Journal<I>::allocate_local_tag(Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ bool predecessor_commit_valid = false;
+ uint64_t predecessor_tag_tid = 0;
+ uint64_t predecessor_entry_tid = 0;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_journaler != nullptr && is_tag_owner());
+
+ cls::journal::Client client;
+ int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
+ if (r < 0) {
+ lderr(cct) << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ m_image_ctx.op_work_queue->queue(on_finish, r);
+ return;
+ }
+
+ // since we are primary, populate the predecessor with our known commit
+ // position
+ assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
+ if (!client.commit_position.object_positions.empty()) {
+ auto position = client.commit_position.object_positions.front();
+ predecessor_commit_valid = true;
+ predecessor_tag_tid = position.tag_tid;
+ predecessor_entry_tid = position.entry_tid;
+ }
+ }
+
+ allocate_tag(LOCAL_MIRROR_UUID, LOCAL_MIRROR_UUID, predecessor_commit_valid,
+ predecessor_tag_tid, predecessor_entry_tid, on_finish);
+}
+
template <typename I>
void Journal<I>::allocate_tag(const std::string &mirror_uuid,
+ const std::string &predecessor_mirror_uuid,
+ bool predecessor_commit_valid,
+ uint64_t predecessor_tag_tid,
+ uint64_t predecessor_entry_tid,
Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
<< dendl;
Mutex::Locker locker(m_lock);
- assert(m_journaler != nullptr && is_tag_owner());
+ assert(m_journaler != nullptr);
- // NOTE: currently responsibility of caller to provide local mirror
- // uuid constant or remote peer uuid
journal::TagData tag_data;
tag_data.mirror_uuid = mirror_uuid;
-
- // TODO: inject current commit position into tag data (need updated journaler PR)
- tag_data.predecessor_mirror_uuid = m_tag_data.mirror_uuid;
+ tag_data.predecessor_mirror_uuid = predecessor_mirror_uuid;
+ tag_data.predecessor_commit_valid = predecessor_commit_valid;
+ tag_data.predecessor_tag_tid = predecessor_tag_tid;
+ tag_data.predecessor_entry_tid = predecessor_entry_tid;
bufferlist tag_bl;
::encode(tag_data, tag_bl);