MOCK_METHOD0(send, void());
};
+template<>
+struct ReplayStatusFormatter<librbd::MockImageReplayerImageCtx> {
+ static ReplayStatusFormatter* s_instance;
+
+ static ReplayStatusFormatter* create(::journal::MockJournalerProxy *journaler,
+ const std::string &mirror_uuid) {
+ assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ ReplayStatusFormatter() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
+};
+
BootstrapRequest<librbd::MockImageReplayerImageCtx>* BootstrapRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
CloseImageRequest<librbd::MockImageReplayerImageCtx>* CloseImageRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
OpenLocalImageRequest<librbd::MockImageReplayerImageCtx>* OpenLocalImageRequest<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
+ReplayStatusFormatter<librbd::MockImageReplayerImageCtx>* ReplayStatusFormatter<librbd::MockImageReplayerImageCtx>::s_instance = nullptr;
} // namespace image_replayer
} // namespace mirror
tools/rbd_mirror/image_replayer/BootstrapRequest.cc \
tools/rbd_mirror/image_replayer/CloseImageRequest.cc \
tools/rbd_mirror/image_replayer/OpenLocalImageRequest.cc \
+ tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc \
tools/rbd_mirror/image_sync/ImageCopyRequest.cc \
tools/rbd_mirror/image_sync/ObjectCopyRequest.cc \
tools/rbd_mirror/image_sync/SnapshotCopyRequest.cc \
tools/rbd_mirror/image_replayer/BootstrapRequest.h \
tools/rbd_mirror/image_replayer/CloseImageRequest.h \
tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h \
+ tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h \
tools/rbd_mirror/image_sync/ImageCopyRequest.h \
tools/rbd_mirror/image_sync/ObjectCopyRequest.h \
tools/rbd_mirror/image_sync/SnapshotCopyRequest.h \
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/OpenLocalImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
struct ReplayHandler : public ::journal::ReplayHandler {
ImageReplayer<I> *replayer;
ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
- virtual void get() {}
+virtual void get() {}
virtual void put() {}
virtual void handle_entries_available() {
template <typename I>
ImageReplayer<I>::~ImageReplayer()
{
+ assert(m_replay_status_formatter == nullptr);
assert(m_local_image_ctx == nullptr);
assert(m_local_replay == nullptr);
assert(m_remote_journaler == nullptr);
&m_threads->timer_lock, m_remote_ioctx,
m_remote_image_id, m_local_mirror_uuid,
commit_interval);
-
bootstrap();
}
std::swap(m_on_start_finish, on_finish);
}
+ m_replay_status_formatter =
+ ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
update_mirror_image_status();
reschedule_update_status_task(30);
m_local_ioctx.close();
+ delete m_replay_status_formatter;
+ m_replay_status_formatter = nullptr;
+
m_remote_journaler->stop_replay();
m_remote_journaler->shut_down();
delete m_remote_journaler;
}
template <typename I>
-void ImageReplayer<I>::update_mirror_image_status(bool final)
+void ImageReplayer<I>::update_mirror_image_status(bool final,
+ State expected_state)
{
- dout(20) << dendl;
+ dout(20) << "final=" << final << ", expected_state=" << expected_state
+ << dendl;
cls::rbd::MirrorImageStatus status;
{
Mutex::Locker locker(m_lock);
+ assert(!final || !is_running_());
+
if (!final) {
+ if (expected_state != STATE_UNKNOWN && expected_state != m_state) {
+ dout(20) << "state changed" << dendl;
+ return;
+ }
if (m_update_status_comp) {
dout(20) << "already sending update" << dendl;
m_update_status_pending = true;
if (comp) {
comp->release();
}
- if (pending && r == 0) {
+ if (pending && r == 0 && is_running_()) {
update_mirror_image_status();
}
});
break;
case STATE_REPLAYING:
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
- status.description = get_replay_status_description();
break;
case STATE_STOPPING:
if (m_local_image_ctx) {
}
}
+ if (status.state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING) {
+ Context *on_req_finish = new FunctionContext(
+ [this](int r) {
+ if (r == 0) {
+ librados::AioCompletion *comp = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ std::swap(m_update_status_comp, comp);
+ }
+ if (comp) {
+ comp->release();
+ }
+ update_mirror_image_status(false, STATE_REPLAYING);
+ }
+ });
+ std::string desc;
+ if (!m_replay_status_formatter->get_or_send_update(&desc, on_req_finish)) {
+ return;
+ }
+ status.description = "replaying, " + desc;
+ }
+
dout(20) << "status=" << status << dendl;
librados::ObjectWriteOperation op;
m_threads->work_queue->queue(ctx, 0);
}
-template <typename I>
-std::string ImageReplayer<I>::get_replay_status_description() {
- assert(m_lock.is_locked());
- assert(m_state == STATE_REPLAYING);
-
- std::stringstream ss;
- ss << "replaying";
-
- cls::journal::Client master;
- int r = m_remote_journaler->get_cached_client(
- librbd::Journal<>::IMAGE_CLIENT_ID, &master);
- if (r == 0) {
- ss << ", master_position=";
- cls::journal::ObjectPositions &object_positions =
- master.commit_position.object_positions;
-
- if (object_positions.begin() != object_positions.end()) {
- ss << *(object_positions.begin());
- } else {
- ss << "[]";
- }
- }
-
- cls::journal::Client mirror;
- r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &mirror);
- if (r == 0) {
- ss << ", mirror_position=";
- cls::journal::ObjectPositions &object_positions =
- mirror.commit_position.object_positions;
-
- if (object_positions.begin() != object_positions.end()) {
- ss << *(object_positions.begin());
- } else {
- ss << "[]";
- }
- }
-
- return ss.str();
-}
-
template <typename I>
std::string ImageReplayer<I>::to_string(const State state) {
switch (state) {
struct Threads;
+namespace image_replayer { template <typename> class ReplayStatusFormatter; }
+
/**
* Replays changes from a remote cluster for a single image.
*/
typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
enum State {
+ STATE_UNKNOWN,
STATE_UNINITIALIZED,
STATE_STARTING,
STATE_REPLAYING,
int m_last_r = 0;
std::string m_state_desc;
BootstrapProgressContext m_progress_cxt;
+ image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
+ nullptr;
librados::IoCtx m_local_ioctx, m_remote_ioctx;
ImageCtxT *m_local_image_ctx = nullptr;
librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
void shut_down_journal_replay(bool cancel_ops);
- void update_mirror_image_status(bool final = false);
+ void update_mirror_image_status(bool final = false,
+ State expected_state = STATE_UNKNOWN);
void reschedule_update_status_task(int new_interval = 0);
void start_update_status_task();
- std::string get_replay_status_description();
-
void bootstrap();
void handle_bootstrap(int r);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ReplayStatusFormatter.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::ReplayStatusFormatter: " \
+ << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+using librbd::util::unique_lock_name;
+
+template <typename I>
+ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
+ const std::string &mirror_uuid)
+ : m_journaler(journaler),
+ m_mirror_uuid(mirror_uuid),
+ m_lock(unique_lock_name("ReplayStatusFormatter::m_lock", this)) {
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
+ Context *on_finish) {
+ dout(20) << dendl;
+
+ bool in_progress = false;
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_on_finish) {
+ in_progress = true;
+ } else {
+ m_on_finish = on_finish;
+ }
+ }
+
+ if (in_progress) {
+ dout(10) << "previous request is still in progress, ignoring" << dendl;
+ on_finish->complete(-EAGAIN);
+ return false;
+ }
+
+ m_master_position = cls::journal::ObjectPosition();
+ m_mirror_position = cls::journal::ObjectPosition();
+
+ cls::journal::Client master_client, mirror_client;
+ int r;
+
+ r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
+ &master_client);
+ if (r < 0) {
+ derr << "error retrieving registered master client: "
+ << cpp_strerror(r) << dendl;
+ } else {
+ r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
+ if (r < 0) {
+ derr << "error retrieving registered mirror client: "
+ << cpp_strerror(r) << dendl;
+ }
+ }
+
+ if (!master_client.commit_position.object_positions.empty()) {
+ m_master_position =
+ *(master_client.commit_position.object_positions.begin());
+ }
+
+ if (!mirror_client.commit_position.object_positions.empty()) {
+ m_mirror_position =
+ *(mirror_client.commit_position.object_positions.begin());
+ }
+
+ if (!calculate_behind_master_or_send_update()) {
+ dout(20) << "need to update tag cache" << dendl;
+ return false;
+ }
+
+ format(description);
+
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_on_finish == on_finish);
+ m_on_finish = nullptr;
+ }
+
+ on_finish->complete(-EEXIST);
+ return true;
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
+ dout(20) << "m_master_position=" << m_master_position
+ << ", m_mirror_position=" << m_mirror_position << dendl;
+
+ m_entries_behind_master = 0;
+
+ if (m_master_position == cls::journal::ObjectPosition()) {
+ return true;
+ }
+
+ cls::journal::ObjectPosition master = m_master_position;
+ uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
+
+ while (master.tag_tid != mirror_tag_tid) {
+ auto tag_it = m_tag_cache.find(master.tag_tid);
+ if (tag_it == m_tag_cache.end()) {
+ send_update_tag_cache(master.tag_tid, mirror_tag_tid);
+ return false;
+ }
+ librbd::journal::TagData &tag_data = tag_it->second;
+ m_entries_behind_master += master.entry_tid;
+ master = cls::journal::ObjectPosition(0, tag_data.predecessor_tag_tid,
+ tag_data.predecessor_entry_tid);
+ }
+ m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
+
+ dout(20) << "clearing tags not needed any more (below mirror position)"
+ << dendl;
+
+ uint64_t tag_tid = mirror_tag_tid;
+ size_t old_size = m_tag_cache.size();
+ while (tag_tid != 0) {
+ auto tag_it = m_tag_cache.find(tag_tid);
+ if (tag_it == m_tag_cache.end()) {
+ break;
+ }
+ librbd::journal::TagData &tag_data = tag_it->second;
+
+ dout(20) << "erasing tag " << tag_data << "for tag_tid " << tag_tid
+ << dendl;
+
+ tag_tid = tag_data.predecessor_tag_tid;
+ m_tag_cache.erase(tag_it);
+ }
+
+ dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
+
+ return true;
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
+ uint64_t mirror_tag_tid) {
+
+ dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
+ << mirror_tag_tid << dendl;
+
+ if (master_tag_tid == mirror_tag_tid) {
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ std::swap(m_on_finish, on_finish);
+ }
+
+ assert(on_finish);
+ on_finish->complete(0);
+ return;
+ }
+
+ FunctionContext *ctx = new FunctionContext(
+ [this, master_tag_tid, mirror_tag_tid](int r) {
+ handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
+ });
+ m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
+ uint64_t mirror_tag_tid,
+ int r) {
+ librbd::journal::TagData tag_data;
+
+ if (r < 0) {
+ derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
+ << dendl;
+ } else {
+ dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
+
+ bufferlist::iterator it = m_tag.data.begin();
+ try {
+ ::decode(tag_data, it);
+ } catch (const buffer::error &err) {
+ derr << "error decoding tag " << master_tag_tid << ": " << err.what()
+ << dendl;
+ }
+ }
+
+ if (tag_data.predecessor_tag_tid == 0) {
+ // We failed. Don't consider this fatal, just terminate retrieving.
+ dout(20) << "making fake tag" << dendl;
+ tag_data.predecessor_tag_tid = mirror_tag_tid;
+ }
+
+ dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
+
+ m_tag_cache.insert(std::make_pair(master_tag_tid, tag_data));
+ send_update_tag_cache(tag_data.predecessor_tag_tid, mirror_tag_tid);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::format(std::string *description) {
+
+ dout(20) << "m_master_position=" << m_master_position
+ << ", m_mirror_position=" << m_mirror_position
+ << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
+
+ std::stringstream ss;
+ ss << "master_position=";
+ if (m_master_position == cls::journal::ObjectPosition()) {
+ ss << "[]";
+ } else {
+ ss << m_master_position;
+ }
+ ss << ", mirror_position=";
+ if (m_mirror_position == cls::journal::ObjectPosition()) {
+ ss << "[]";
+ } else {
+ ss << m_mirror_position;
+ }
+ ss << ", entries_behind_master="
+ << (m_entries_behind_master > 0 ? m_entries_behind_master : 0);
+
+ *description = ss.str();
+}
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class
+rbd::mirror::image_replayer::ReplayStatusFormatter<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+
+#include "include/Context.h"
+#include "common/Mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+
+namespace journal { class Journaler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ReplayStatusFormatter {
+public:
+ typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ static ReplayStatusFormatter* create(Journaler *journaler,
+ const std::string &mirror_uuid) {
+ return new ReplayStatusFormatter(journaler, mirror_uuid);
+ }
+
+ ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
+
+ bool get_or_send_update(std::string *description, Context *on_finish);
+
+private:
+ Journaler *m_journaler;
+ std::string m_mirror_uuid;
+ Mutex m_lock;
+ Context *m_on_finish = nullptr;
+ cls::journal::ObjectPosition m_master_position;
+ cls::journal::ObjectPosition m_mirror_position;
+ int m_entries_behind_master = 0;
+ cls::journal::Tag m_tag;
+ std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
+
+ bool calculate_behind_master_or_send_update();
+ void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
+ void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
+ int r);
+ void format(std::string *description);
+};
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H