From: Mykola Golub Date: Thu, 21 Apr 2016 06:52:47 +0000 (+0300) Subject: rbd-mirror: in replay status store number of entries behind master X-Git-Tag: v11.0.0~795^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e4229d1d4b163f1003e68fef1db63f796393ba23;p=ceph.git rbd-mirror: in replay status store number of entries behind master Signed-off-by: Mykola Golub --- diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index cf19c6747d47..3233b48df723 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -125,9 +125,28 @@ struct OpenLocalImageRequest { MOCK_METHOD0(send, void()); }; +template<> +struct ReplayStatusFormatter { + static ReplayStatusFormatter* s_instance; + + static ReplayStatusFormatter* create(::journal::MockJournalerProxy *journaler, + const std::string &mirror_uuid) { + assert(s_instance != nullptr); + return s_instance; + } + + ReplayStatusFormatter() { + assert(s_instance == nullptr); + s_instance = this; + } + + MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish)); +}; + BootstrapRequest* BootstrapRequest::s_instance = nullptr; CloseImageRequest* CloseImageRequest::s_instance = nullptr; OpenLocalImageRequest* OpenLocalImageRequest::s_instance = nullptr; +ReplayStatusFormatter* ReplayStatusFormatter::s_instance = nullptr; } // namespace image_replayer } // namespace mirror diff --git a/src/tools/Makefile-client.am b/src/tools/Makefile-client.am index 68614e223295..a1b2e95675d7 100644 --- a/src/tools/Makefile-client.am +++ b/src/tools/Makefile-client.am @@ -100,6 +100,7 @@ librbd_mirror_internal_la_SOURCES = \ tools/rbd_mirror/image_replayer/BootstrapRequest.cc \ tools/rbd_mirror/image_replayer/CloseImageRequest.cc \ tools/rbd_mirror/image_replayer/OpenLocalImageRequest.cc \ + tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc \ tools/rbd_mirror/image_sync/ImageCopyRequest.cc \ tools/rbd_mirror/image_sync/ObjectCopyRequest.cc \ tools/rbd_mirror/image_sync/SnapshotCopyRequest.cc \ @@ -119,6 +120,7 @@ noinst_HEADERS += \ tools/rbd_mirror/image_replayer/BootstrapRequest.h \ tools/rbd_mirror/image_replayer/CloseImageRequest.h \ tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h \ + tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h \ tools/rbd_mirror/image_sync/ImageCopyRequest.h \ tools/rbd_mirror/image_sync/ObjectCopyRequest.h \ tools/rbd_mirror/image_sync/SnapshotCopyRequest.h \ diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 698eefb7fb65..65dde98c0d76 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -23,6 +23,7 @@ #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" #include "tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h" +#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h" #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix @@ -50,8 +51,7 @@ template struct ReplayHandler : public ::journal::ReplayHandler { ImageReplayer *replayer; ReplayHandler(ImageReplayer *replayer) : replayer(replayer) {} - - virtual void get() {} +virtual void get() {} virtual void put() {} virtual void handle_entries_available() { @@ -202,6 +202,7 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot template ImageReplayer::~ImageReplayer() { + assert(m_replay_status_formatter == nullptr); assert(m_local_image_ctx == nullptr); assert(m_local_replay == nullptr); assert(m_remote_journaler == nullptr); @@ -268,7 +269,6 @@ void ImageReplayer::start(Context *on_finish, &m_threads->timer_lock, m_remote_ioctx, m_remote_image_id, m_local_mirror_uuid, commit_interval); - bootstrap(); } @@ -379,6 +379,8 @@ void ImageReplayer::start_replay() { std::swap(m_on_start_finish, on_finish); } + m_replay_status_formatter = + ReplayStatusFormatter::create(m_remote_journaler, m_local_mirror_uuid); update_mirror_image_status(); reschedule_update_status_task(30); @@ -584,6 +586,9 @@ void ImageReplayer::on_stop_local_image_close_finish(int r) m_local_ioctx.close(); + delete m_replay_status_formatter; + m_replay_status_formatter = nullptr; + m_remote_journaler->stop_replay(); m_remote_journaler->shut_down(); delete m_remote_journaler; @@ -924,16 +929,24 @@ void ImageReplayer::shut_down_journal_replay(bool cancel_ops) } template -void ImageReplayer::update_mirror_image_status(bool final) +void ImageReplayer::update_mirror_image_status(bool final, + State expected_state) { - dout(20) << dendl; + dout(20) << "final=" << final << ", expected_state=" << expected_state + << dendl; cls::rbd::MirrorImageStatus status; { Mutex::Locker locker(m_lock); + assert(!final || !is_running_()); + if (!final) { + if (expected_state != STATE_UNKNOWN && expected_state != m_state) { + dout(20) << "state changed" << dendl; + return; + } if (m_update_status_comp) { dout(20) << "already sending update" << dendl; m_update_status_pending = true; @@ -956,7 +969,7 @@ void ImageReplayer::update_mirror_image_status(bool final) if (comp) { comp->release(); } - if (pending && r == 0) { + if (pending && r == 0 && is_running_()) { update_mirror_image_status(); } }); @@ -987,7 +1000,6 @@ void ImageReplayer::update_mirror_image_status(bool final) break; case STATE_REPLAYING: status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING; - status.description = get_replay_status_description(); break; case STATE_STOPPING: if (m_local_image_ctx) { @@ -1010,6 +1022,28 @@ void ImageReplayer::update_mirror_image_status(bool final) } } + if (status.state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING) { + Context *on_req_finish = new FunctionContext( + [this](int r) { + if (r == 0) { + librados::AioCompletion *comp = nullptr; + { + Mutex::Locker locker(m_lock); + std::swap(m_update_status_comp, comp); + } + if (comp) { + comp->release(); + } + update_mirror_image_status(false, STATE_REPLAYING); + } + }); + std::string desc; + if (!m_replay_status_formatter->get_or_send_update(&desc, on_req_finish)) { + return; + } + status.description = "replaying, " + desc; + } + dout(20) << "status=" << status << dendl; librados::ObjectWriteOperation op; @@ -1081,46 +1115,6 @@ void ImageReplayer::start_update_status_task() m_threads->work_queue->queue(ctx, 0); } -template -std::string ImageReplayer::get_replay_status_description() { - assert(m_lock.is_locked()); - assert(m_state == STATE_REPLAYING); - - std::stringstream ss; - ss << "replaying"; - - cls::journal::Client master; - int r = m_remote_journaler->get_cached_client( - librbd::Journal<>::IMAGE_CLIENT_ID, &master); - if (r == 0) { - ss << ", master_position="; - cls::journal::ObjectPositions &object_positions = - master.commit_position.object_positions; - - if (object_positions.begin() != object_positions.end()) { - ss << *(object_positions.begin()); - } else { - ss << "[]"; - } - } - - cls::journal::Client mirror; - r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &mirror); - if (r == 0) { - ss << ", mirror_position="; - cls::journal::ObjectPositions &object_positions = - mirror.commit_position.object_positions; - - if (object_positions.begin() != object_positions.end()) { - ss << *(object_positions.begin()); - } else { - ss << "[]"; - } - } - - return ss.str(); -} - template std::string ImageReplayer::to_string(const State state) { switch (state) { diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 01cbf85bdbad..9cff9bba4463 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -40,6 +40,8 @@ namespace mirror { struct Threads; +namespace image_replayer { template class ReplayStatusFormatter; } + /** * Replays changes from a remote cluster for a single image. */ @@ -49,6 +51,7 @@ public: typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; enum State { + STATE_UNKNOWN, STATE_UNINITIALIZED, STATE_STARTING, STATE_REPLAYING, @@ -208,6 +211,8 @@ private: int m_last_r = 0; std::string m_state_desc; BootstrapProgressContext m_progress_cxt; + image_replayer::ReplayStatusFormatter *m_replay_status_formatter = + nullptr; librados::IoCtx m_local_ioctx, m_remote_ioctx; ImageCtxT *m_local_image_ctx = nullptr; librbd::journal::Replay *m_local_replay = nullptr; @@ -254,12 +259,11 @@ private: void shut_down_journal_replay(bool cancel_ops); - void update_mirror_image_status(bool final = false); + void update_mirror_image_status(bool final = false, + State expected_state = STATE_UNKNOWN); void reschedule_update_status_task(int new_interval = 0); void start_update_status_task(); - std::string get_replay_status_description(); - void bootstrap(); void handle_bootstrap(int r); diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc new file mode 100644 index 000000000000..303fb587f319 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc @@ -0,0 +1,240 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ReplayStatusFormatter.h" +#include "common/debug.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" + +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::ReplayStatusFormatter: " \ + << this << " " << __func__ << ": " + +namespace rbd { +namespace mirror { +namespace image_replayer { + +using librbd::util::unique_lock_name; + +template +ReplayStatusFormatter::ReplayStatusFormatter(Journaler *journaler, + const std::string &mirror_uuid) + : m_journaler(journaler), + m_mirror_uuid(mirror_uuid), + m_lock(unique_lock_name("ReplayStatusFormatter::m_lock", this)) { +} + +template +bool ReplayStatusFormatter::get_or_send_update(std::string *description, + Context *on_finish) { + dout(20) << dendl; + + bool in_progress = false; + { + Mutex::Locker locker(m_lock); + if (m_on_finish) { + in_progress = true; + } else { + m_on_finish = on_finish; + } + } + + if (in_progress) { + dout(10) << "previous request is still in progress, ignoring" << dendl; + on_finish->complete(-EAGAIN); + return false; + } + + m_master_position = cls::journal::ObjectPosition(); + m_mirror_position = cls::journal::ObjectPosition(); + + cls::journal::Client master_client, mirror_client; + int r; + + r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID, + &master_client); + if (r < 0) { + derr << "error retrieving registered master client: " + << cpp_strerror(r) << dendl; + } else { + r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client); + if (r < 0) { + derr << "error retrieving registered mirror client: " + << cpp_strerror(r) << dendl; + } + } + + if (!master_client.commit_position.object_positions.empty()) { + m_master_position = + *(master_client.commit_position.object_positions.begin()); + } + + if (!mirror_client.commit_position.object_positions.empty()) { + m_mirror_position = + *(mirror_client.commit_position.object_positions.begin()); + } + + if (!calculate_behind_master_or_send_update()) { + dout(20) << "need to update tag cache" << dendl; + return false; + } + + format(description); + + { + Mutex::Locker locker(m_lock); + assert(m_on_finish == on_finish); + m_on_finish = nullptr; + } + + on_finish->complete(-EEXIST); + return true; +} + +template +bool ReplayStatusFormatter::calculate_behind_master_or_send_update() { + dout(20) << "m_master_position=" << m_master_position + << ", m_mirror_position=" << m_mirror_position << dendl; + + m_entries_behind_master = 0; + + if (m_master_position == cls::journal::ObjectPosition()) { + return true; + } + + cls::journal::ObjectPosition master = m_master_position; + uint64_t mirror_tag_tid = m_mirror_position.tag_tid; + + while (master.tag_tid != mirror_tag_tid) { + auto tag_it = m_tag_cache.find(master.tag_tid); + if (tag_it == m_tag_cache.end()) { + send_update_tag_cache(master.tag_tid, mirror_tag_tid); + return false; + } + librbd::journal::TagData &tag_data = tag_it->second; + m_entries_behind_master += master.entry_tid; + master = cls::journal::ObjectPosition(0, tag_data.predecessor_tag_tid, + tag_data.predecessor_entry_tid); + } + m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid; + + dout(20) << "clearing tags not needed any more (below mirror position)" + << dendl; + + uint64_t tag_tid = mirror_tag_tid; + size_t old_size = m_tag_cache.size(); + while (tag_tid != 0) { + auto tag_it = m_tag_cache.find(tag_tid); + if (tag_it == m_tag_cache.end()) { + break; + } + librbd::journal::TagData &tag_data = tag_it->second; + + dout(20) << "erasing tag " << tag_data << "for tag_tid " << tag_tid + << dendl; + + tag_tid = tag_data.predecessor_tag_tid; + m_tag_cache.erase(tag_it); + } + + dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl; + + return true; +} + +template +void ReplayStatusFormatter::send_update_tag_cache(uint64_t master_tag_tid, + uint64_t mirror_tag_tid) { + + dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid=" + << mirror_tag_tid << dendl; + + if (master_tag_tid == mirror_tag_tid) { + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + std::swap(m_on_finish, on_finish); + } + + assert(on_finish); + on_finish->complete(0); + return; + } + + FunctionContext *ctx = new FunctionContext( + [this, master_tag_tid, mirror_tag_tid](int r) { + handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r); + }); + m_journaler->get_tag(master_tag_tid, &m_tag, ctx); +} + +template +void ReplayStatusFormatter::handle_update_tag_cache(uint64_t master_tag_tid, + uint64_t mirror_tag_tid, + int r) { + librbd::journal::TagData tag_data; + + if (r < 0) { + derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r) + << dendl; + } else { + dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl; + + bufferlist::iterator it = m_tag.data.begin(); + try { + ::decode(tag_data, it); + } catch (const buffer::error &err) { + derr << "error decoding tag " << master_tag_tid << ": " << err.what() + << dendl; + } + } + + if (tag_data.predecessor_tag_tid == 0) { + // We failed. Don't consider this fatal, just terminate retrieving. + dout(20) << "making fake tag" << dendl; + tag_data.predecessor_tag_tid = mirror_tag_tid; + } + + dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl; + + m_tag_cache.insert(std::make_pair(master_tag_tid, tag_data)); + send_update_tag_cache(tag_data.predecessor_tag_tid, mirror_tag_tid); +} + +template +void ReplayStatusFormatter::format(std::string *description) { + + dout(20) << "m_master_position=" << m_master_position + << ", m_mirror_position=" << m_mirror_position + << ", m_entries_behind_master=" << m_entries_behind_master << dendl; + + std::stringstream ss; + ss << "master_position="; + if (m_master_position == cls::journal::ObjectPosition()) { + ss << "[]"; + } else { + ss << m_master_position; + } + ss << ", mirror_position="; + if (m_mirror_position == cls::journal::ObjectPosition()) { + ss << "[]"; + } else { + ss << m_mirror_position; + } + ss << ", entries_behind_master=" + << (m_entries_behind_master > 0 ? m_entries_behind_master : 0); + + *description = ss.str(); +} + +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class +rbd::mirror::image_replayer::ReplayStatusFormatter; diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h new file mode 100644 index 000000000000..00d7a05a52ec --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H +#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H + +#include "include/Context.h" +#include "common/Mutex.h" +#include "cls/journal/cls_journal_types.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" + +namespace journal { class Journaler; } +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { +namespace image_replayer { + +template +class ReplayStatusFormatter { +public: + typedef typename librbd::journal::TypeTraits::Journaler Journaler; + + static ReplayStatusFormatter* create(Journaler *journaler, + const std::string &mirror_uuid) { + return new ReplayStatusFormatter(journaler, mirror_uuid); + } + + ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid); + + bool get_or_send_update(std::string *description, Context *on_finish); + +private: + Journaler *m_journaler; + std::string m_mirror_uuid; + Mutex m_lock; + Context *m_on_finish = nullptr; + cls::journal::ObjectPosition m_master_position; + cls::journal::ObjectPosition m_mirror_position; + int m_entries_behind_master = 0; + cls::journal::Tag m_tag; + std::map m_tag_cache; + + bool calculate_behind_master_or_send_update(); + void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid); + void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid, + int r); + void format(std::string *description); +}; + +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H