#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Replay.h"
-#include "librbd/journal/Types.h"
#include "librbd/Utils.h"
+#include "cls/journal/cls_journal_types.h"
#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
#include "common/errno.h"
namespace librbd {
+namespace {
+
+struct C_DecodeTag : public Context {
+ CephContext *cct;
+ Mutex *lock;
+ uint64_t *tag_tid;
+ journal::TagData *tag_data;
+ Context *on_finish;
+
+ cls::journal::Tag tag;
+
+ C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+ journal::TagData *tag_data, Context *on_finish)
+ : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+ on_finish(on_finish) {
+ }
+
+ virtual void complete(int r) override {
+ on_finish->complete(process(r));
+ Context::complete(0);
+ }
+ virtual void finish(int r) override {
+ }
+
+ int process(int r) {
+ if (r < 0) {
+ lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ Mutex::Locker locker(*lock);
+ *tag_tid = tag.tid;
+
+ bufferlist::iterator data_it = tag.data.begin();
+ r = decode(&data_it, tag_data);
+ if (r < 0) {
+ lderr(cct) << "failed to decode allocated tag" << dendl;
+ return r;
+ }
+ return 0;
+ }
+
+ static int decode(bufferlist::iterator *it,
+ journal::TagData *tag_data) {
+ try {
+ ::decode(*tag_data, *it);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+ }
+
+};
+
+struct C_DecodeTags : public Context {
+ CephContext *cct;
+ Mutex *lock;
+ uint64_t *tag_tid;
+ journal::TagData *tag_data;
+ Context *on_finish;
+
+ ::journal::Journaler::Tags tags;
+
+ C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+ journal::TagData *tag_data, Context *on_finish)
+ : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+ on_finish(on_finish) {
+ }
+
+ virtual void complete(int r) {
+ on_finish->complete(process(r));
+ Context::complete(0);
+ }
+ virtual void finish(int r) override {
+ }
+
+ int process(int r) {
+ if (r < 0) {
+ lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ if (tags.empty()) {
+ lderr(cct) << "no journal tags retrieved" << dendl;
+ return -ENOENT;
+ }
+
+ Mutex::Locker locker(*lock);
+ *tag_tid = tags.back().tid;
+
+ bufferlist::iterator data_it = tags.back().data.begin();
+ r = C_DecodeTag::decode(&data_it, tag_data);
+ if (r < 0) {
+ lderr(cct) << "failed to decode journal tag" << dendl;
+ return r;
+ }
+ return 0;
+ }
+};
+
+} // anonymous namespace
+
using util::create_async_context_callback;
using util::create_context_callback;
+// client id for local image
+template <typename I>
+const std::string Journal<I>::IMAGE_CLIENT_ID("");
+
+// mirror uuid to use for local images
+template <typename I>
+const std::string Journal<I>::LOCAL_MIRROR_UUID("");
+
template <typename I>
std::ostream &operator<<(std::ostream &os,
const typename Journal<I>::State &state) {
pool_id = data_io_ctx.get_id();
}
- Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
+ Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
+ cct->_conf->rbd_journal_commit_age);
int r = journaler.create(order, splay_width, pool_id);
if (r < 0) {
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
+ Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
+ cct->_conf->rbd_journal_commit_age);
bool journal_exists;
int r = journaler.exists(&journal_exists);
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
+ Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
+ cct->_conf->rbd_journal_commit_age);
C_SaferCond cond;
journaler.init(&cond);
wait_for_steady_state(on_finish);
}
+template <typename I>
+bool Journal<I>::is_tag_owner() const {
+ return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
+}
+
+template <typename I>
+void Journal<I>::allocate_tag(const std::string &mirror_uuid,
+ Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
+ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_journaler != nullptr && is_tag_owner());
+
+ // NOTE: currently responsibility of caller to provide local mirror
+ // uuid constant or remote peer uuid
+ journal::TagData tag_data;
+ tag_data.mirror_uuid = mirror_uuid;
+
+ // TODO: inject current commit position into tag data (need updated journaler PR)
+ tag_data.predecessor_mirror_uuid = m_tag_data.mirror_uuid;
+
+ bufferlist tag_bl;
+ ::encode(tag_data, tag_bl);
+
+ C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
+ &m_tag_data, on_finish);
+ m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
+ decode_tag_ctx);
+}
+
template <typename I>
void Journal<I>::flush_commit_position(Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
tid = ++m_event_tid;
assert(tid != 0);
- // TODO: use allocated tag_id
- future = m_journaler->append(0, bl);
+ future = m_journaler->append(m_tag_tid, bl);
m_events[tid] = Event(future, aio_comp, requests, offset, length);
}
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);
- // TODO: use allocated tag_id
- future = m_journaler->append(0, bl);
+ future = m_journaler->append(m_tag_tid, bl);
// delay committing op event to ensure consistent replay
assert(m_op_futures.count(op_tid) == 0);
op_start_future = it->second;
m_op_futures.erase(it);
- // TODO: use allocated tag_id
- op_finish_future = m_journaler->append(0, bl);
+ op_finish_future = m_journaler->append(m_tag_tid, bl);
}
op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
assert(m_journaler == NULL);
transition_state(STATE_INITIALIZING, 0);
- m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
- m_image_ctx.journal_commit_age);
+ m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id,
+ IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age);
m_journaler->init(create_async_context_callback(
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_initialized>(this)));
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_INITIALIZING);
if (r < 0) {
lderr(cct) << this << " " << __func__
return;
}
+ // locate the master image client record
+ cls::journal::Client client;
+ r = m_journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID,
+ &client);
+ if (r < 0) {
+ lderr(cct) << "failed to locate master image client" << dendl;
+ destroy_journaler(r);
+ return;
+ }
+
+ librbd::journal::ClientData client_data;
+ bufferlist::iterator bl = client.data.begin();
+ try {
+ ::decode(client_data, bl);
+ } catch (const buffer::error &err) {
+ lderr(cct) << "failed to decode client meta data: " << err.what()
+ << dendl;
+ destroy_journaler(-EINVAL);
+ return;
+ }
+
+ librbd::journal::ImageClientMeta *image_client_meta =
+ boost::get<librbd::journal::ImageClientMeta>(&client_data.client_meta);
+ if (image_client_meta == nullptr) {
+ lderr(cct) << "failed to extract client meta data" << dendl;
+ destroy_journaler(-EINVAL);
+ return;
+ }
+
+ m_tag_class = image_client_meta->tag_class;
+ C_DecodeTags *tags_ctx = new C_DecodeTags(
+ cct, &m_lock, &m_tag_tid, &m_tag_data, create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_get_tags>(this)));
+ m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx);
+}
+
+template <typename I>
+void Journal<I>::handle_get_tags(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_INITIALIZING);
+
+ if (r < 0) {
+ destroy_journaler(r);
+ return;
+ }
+
transition_state(STATE_REPLAYING, 0);
m_journal_replay = journal::Replay<I>::create(m_image_ctx);
m_journaler->start_replay(&m_replay_handler);