From: Jason Dillaman Date: Fri, 26 Feb 2016 15:56:53 +0000 (-0500) Subject: librbd: add tag handling to journal state machine X-Git-Tag: v10.1.0~187^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=cbcfedf7d66ee358dd1ad29ed49afb3605db9bc4;p=ceph.git librbd: add tag handling to journal state machine The journal will not retrieve the tag class for the image within the journal in addition to the most recently allocated tag. Also added helper methods to allocate new tags. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index af82c6cf5fb30..f9aff5bf23c1c 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -8,8 +8,8 @@ #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Replay.h" -#include "librbd/journal/Types.h" #include "librbd/Utils.h" +#include "cls/journal/cls_journal_types.h" #include "journal/Journaler.h" #include "journal/ReplayEntry.h" #include "common/errno.h" @@ -20,9 +20,120 @@ namespace librbd { +namespace { + +struct C_DecodeTag : public Context { + CephContext *cct; + Mutex *lock; + uint64_t *tag_tid; + journal::TagData *tag_data; + Context *on_finish; + + cls::journal::Tag tag; + + C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid, + journal::TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + virtual void complete(int r) override { + on_finish->complete(process(r)); + Context::complete(0); + } + virtual void finish(int r) override { + } + + int process(int r) { + if (r < 0) { + lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl; + return r; + } + + Mutex::Locker locker(*lock); + *tag_tid = tag.tid; + + bufferlist::iterator data_it = tag.data.begin(); + r = decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "failed to decode allocated tag" << dendl; + return r; + } + return 0; + } + + static int decode(bufferlist::iterator *it, + journal::TagData *tag_data) { + try { + ::decode(*tag_data, *it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; + } + +}; + +struct C_DecodeTags : public Context { + CephContext *cct; + Mutex *lock; + uint64_t *tag_tid; + journal::TagData *tag_data; + Context *on_finish; + + ::journal::Journaler::Tags tags; + + C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid, + journal::TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + virtual void complete(int r) { + on_finish->complete(process(r)); + Context::complete(0); + } + virtual void finish(int r) override { + } + + int process(int r) { + if (r < 0) { + lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r) + << dendl; + return r; + } + + if (tags.empty()) { + lderr(cct) << "no journal tags retrieved" << dendl; + return -ENOENT; + } + + Mutex::Locker locker(*lock); + *tag_tid = tags.back().tid; + + bufferlist::iterator data_it = tags.back().data.begin(); + r = C_DecodeTag::decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "failed to decode journal tag" << dendl; + return r; + } + return 0; + } +}; + +} // anonymous namespace + using util::create_async_context_callback; using util::create_context_callback; +// client id for local image +template +const std::string Journal::IMAGE_CLIENT_ID(""); + +// mirror uuid to use for local images +template +const std::string Journal::LOCAL_MIRROR_UUID(""); + template std::ostream &operator<<(std::ostream &os, const typename Journal::State &state) { @@ -111,7 +222,8 @@ int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id, pool_id = data_io_ctx.get_id(); } - Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, + cct->_conf->rbd_journal_commit_age); int r = journaler.create(order, splay_width, pool_id); if (r < 0) { @@ -150,7 +262,8 @@ int Journal::remove(librados::IoCtx &io_ctx, const std::string &image_id) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; - Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, + cct->_conf->rbd_journal_commit_age); bool journal_exists; int r = journaler.exists(&journal_exists); @@ -185,7 +298,8 @@ int Journal::reset(librados::IoCtx &io_ctx, const std::string &image_id) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; - Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, + cct->_conf->rbd_journal_commit_age); C_SaferCond cond; journaler.init(&cond); @@ -281,6 +395,38 @@ void Journal::close(Context *on_finish) { wait_for_steady_state(on_finish); } +template +bool Journal::is_tag_owner() const { + return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID); +} + +template +void Journal::allocate_tag(const std::string &mirror_uuid, + Context *on_finish) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid + << dendl; + + Mutex::Locker locker(m_lock); + assert(m_journaler != nullptr && is_tag_owner()); + + // NOTE: currently responsibility of caller to provide local mirror + // uuid constant or remote peer uuid + journal::TagData tag_data; + tag_data.mirror_uuid = mirror_uuid; + + // TODO: inject current commit position into tag data (need updated journaler PR) + tag_data.predecessor_mirror_uuid = m_tag_data.mirror_uuid; + + bufferlist tag_bl; + ::encode(tag_data, tag_bl); + + C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid, + &m_tag_data, on_finish); + m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag, + decode_tag_ctx); +} + template void Journal::flush_commit_position(Context *on_finish) { CephContext *cct = m_image_ctx.cct; @@ -312,8 +458,7 @@ uint64_t Journal::append_io_event(AioCompletion *aio_comp, tid = ++m_event_tid; assert(tid != 0); - // TODO: use allocated tag_id - future = m_journaler->append(0, bl); + future = m_journaler->append(m_tag_tid, bl); m_events[tid] = Event(future, aio_comp, requests, offset, length); } @@ -398,8 +543,7 @@ void Journal::append_op_event(uint64_t op_tid, Mutex::Locker locker(m_lock); assert(m_state == STATE_READY); - // TODO: use allocated tag_id - future = m_journaler->append(0, bl); + future = m_journaler->append(m_tag_tid, bl); // delay committing op event to ensure consistent replay assert(m_op_futures.count(op_tid) == 0); @@ -438,8 +582,7 @@ void Journal::commit_op_event(uint64_t op_tid, int r) { op_start_future = it->second; m_op_futures.erase(it); - // TODO: use allocated tag_id - op_finish_future = m_journaler->append(0, bl); + op_finish_future = m_journaler->append(m_tag_tid, bl); } op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future, @@ -553,8 +696,8 @@ void Journal::create_journaler() { assert(m_journaler == NULL); transition_state(STATE_INITIALIZING, 0); - m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "", - m_image_ctx.journal_commit_age); + m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, + IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age); m_journaler->init(create_async_context_callback( m_image_ctx, create_context_callback< Journal, &Journal::handle_initialized>(this))); @@ -625,6 +768,7 @@ void Journal::handle_initialized(int r) { ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; Mutex::Locker locker(m_lock); + assert(m_state == STATE_INITIALIZING); if (r < 0) { lderr(cct) << this << " " << __func__ @@ -634,6 +778,56 @@ void Journal::handle_initialized(int r) { return; } + // locate the master image client record + cls::journal::Client client; + r = m_journaler->get_cached_client(Journal::IMAGE_CLIENT_ID, + &client); + if (r < 0) { + lderr(cct) << "failed to locate master image client" << dendl; + destroy_journaler(r); + return; + } + + librbd::journal::ClientData client_data; + bufferlist::iterator bl = client.data.begin(); + try { + ::decode(client_data, bl); + } catch (const buffer::error &err) { + lderr(cct) << "failed to decode client meta data: " << err.what() + << dendl; + destroy_journaler(-EINVAL); + return; + } + + librbd::journal::ImageClientMeta *image_client_meta = + boost::get(&client_data.client_meta); + if (image_client_meta == nullptr) { + lderr(cct) << "failed to extract client meta data" << dendl; + destroy_journaler(-EINVAL); + return; + } + + m_tag_class = image_client_meta->tag_class; + C_DecodeTags *tags_ctx = new C_DecodeTags( + cct, &m_lock, &m_tag_tid, &m_tag_data, create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_get_tags>(this))); + m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx); +} + +template +void Journal::handle_get_tags(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + assert(m_state == STATE_INITIALIZING); + + if (r < 0) { + destroy_journaler(r); + return; + } + transition_state(STATE_REPLAYING, 0); m_journal_replay = journal::Replay::create(m_image_ctx); m_journaler->start_replay(&m_replay_handler); diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index a9f376763f0f0..127569ea740ed 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -13,6 +13,7 @@ #include "journal/Future.h" #include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" +#include "librbd/journal/Types.h" #include #include #include @@ -32,7 +33,6 @@ class ImageCtx; namespace journal { -class EventEntry; template class Replay; template @@ -92,6 +92,9 @@ public: STATE_CLOSED }; + static const std::string IMAGE_CLIENT_ID; + static const std::string LOCAL_MIRROR_UUID; + typedef std::list AioObjectRequests; Journal(ImageCtxT &image_ctx); @@ -112,6 +115,9 @@ public: void open(Context *on_finish); void close(Context *on_finish); + bool is_tag_owner() const; + void allocate_tag(const std::string &mirror_uuid, Context *on_finish); + void flush_commit_position(Context *on_finish); uint64_t append_io_event(AioCompletion *aio_comp, @@ -241,6 +247,9 @@ private: Journaler *m_journaler; mutable Mutex m_lock; State m_state; + uint64_t m_tag_class = 0; + uint64_t m_tag_tid = 0; + journal::TagData m_tag_data; int m_error_result; Contexts m_wait_for_state_contexts; @@ -268,6 +277,7 @@ private: void complete_event(typename Events::iterator it, int r); void handle_initialized(int r); + void handle_get_tags(int r); void handle_replay_ready(); void handle_replay_complete(int r);