MOCK_METHOD2(mock_release_image, void(const std::string &, Context*));
MOCK_METHOD3(mock_remove_image, void(const std::string &,
const std::string &, Context*));
+ MOCK_METHOD2(mock_acquire_group, void(const std::string &, Context*));
+ MOCK_METHOD2(mock_release_group, void(const std::string &, Context*));
+ MOCK_METHOD3(mock_remove_group, void(const std::string &,
+ const std::string &, Context*));
void acquire_image(const std::string &global_image_id,
const std::string &instance_id, Context* on_finish) {
const std::string &instance_id, Context* on_finish) {
mock_remove_image(mirror_uuid, global_image_id, on_finish);
}
+
+ void acquire_group(const std::string &global_group_id,
+ const std::string &instance_id, Context* on_finish) {
+ mock_acquire_group(global_group_id, on_finish);
+ }
+
+ void release_group(const std::string &global_group_id,
+ const std::string &instance_id, Context* on_finish) {
+ mock_release_group(global_group_id, on_finish);
+ }
+
+ void remove_group(const std::string &mirror_uuid,
+ const std::string &global_group_id,
+ const std::string &instance_id, Context* on_finish) {
+ mock_remove_group(mirror_uuid, global_group_id, on_finish);
+ }
};
TestMockImageMap() = default;
#include "test/rbd_mirror/test_mock_fixture.h"
#include "test/rbd_mirror/mock/MockContextWQ.h"
#include "test/rbd_mirror/mock/MockSafeTimer.h"
+#include "tools/rbd_mirror/GroupReplayer.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
struct InstanceWatcher<librbd::MockTestImageCtx> {
};
+template<>
+struct GroupReplayer<librbd::MockTestImageCtx> {
+ static GroupReplayer* s_instance;
+ std::string global_group_id;
+
+ static GroupReplayer *create(
+ librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+ const std::string &global_group_id,
+ Threads<librbd::MockTestImageCtx> *threads,
+ InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+ MirrorStatusUpdater<librbd::MockTestImageCtx>* local_status_updater,
+ journal::CacheManagerHandler *cache_manager_handler,
+ PoolMetaCache* pool_meta_cache) {
+ ceph_assert(s_instance != nullptr);
+ s_instance->global_group_id = global_group_id;
+ return s_instance;
+ }
+
+ GroupReplayer() {
+ ceph_assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ virtual ~GroupReplayer() {
+ ceph_assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD0(destroy, void());
+ MOCK_METHOD2(start, void(Context *, bool));
+ MOCK_METHOD2(stop, void(Context *, bool));
+ MOCK_METHOD1(restart, void(Context*));
+ MOCK_METHOD0(flush, void());
+ MOCK_METHOD1(print_status, void(Formatter *));
+ MOCK_METHOD1(add_peer, void(const Peer<librbd::MockTestImageCtx>& peer));
+ MOCK_METHOD0(get_global_group_id, const std::string &());
+ MOCK_METHOD0(is_running, bool());
+ MOCK_METHOD0(is_stopped, bool());
+ MOCK_METHOD0(is_blocklisted, bool());
+
+ MOCK_CONST_METHOD0(is_finished, bool());
+ MOCK_METHOD1(set_finished, void(bool));
+
+ MOCK_CONST_METHOD0(get_health_state, image_replayer::HealthState());
+};
+
+GroupReplayer<librbd::MockTestImageCtx>* GroupReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
+
template<>
struct ImageReplayer<librbd::MockTestImageCtx> {
static ImageReplayer* s_instance;
class TestMockInstanceReplayer : public TestMockFixture {
public:
typedef Threads<librbd::MockTestImageCtx> MockThreads;
+ typedef GroupReplayer<librbd::MockTestImageCtx> MockGroupReplayer;
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
expect_work_queue(mock_threads);
Context *timer_ctx = nullptr;
expect_add_event_after(mock_threads, &timer_ctx);
+ Context *group_timer_ctx = nullptr;
+ expect_add_event_after(mock_threads, &group_timer_ctx);
instance_replayer.init();
instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
instance_replayer.shut_down();
ASSERT_TRUE(timer_ctx != nullptr);
delete timer_ctx;
+ ASSERT_TRUE(group_timer_ctx != nullptr);
+ delete group_timer_ctx;
}
TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) {
expect_work_queue(mock_threads);
Context *timer_ctx1 = nullptr;
expect_add_event_after(mock_threads, &timer_ctx1);
+ Context *group_timer_ctx = nullptr;
+ expect_add_event_after(mock_threads, &group_timer_ctx);
instance_replayer.init();
instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
instance_replayer.shut_down();
ASSERT_TRUE(timer_ctx2 != nullptr);
delete timer_ctx2;
+ ASSERT_TRUE(group_timer_ctx != nullptr);
+ delete group_timer_ctx;
}
TEST_F(TestMockInstanceReplayer, Reacquire) {
expect_work_queue(mock_threads);
Context *timer_ctx = nullptr;
expect_add_event_after(mock_threads, &timer_ctx);
+ Context *group_timer_ctx = nullptr;
+ expect_add_event_after(mock_threads, &group_timer_ctx);
instance_replayer.init();
instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
instance_replayer.shut_down();
ASSERT_TRUE(timer_ctx != nullptr);
delete timer_ctx;
+ ASSERT_TRUE(group_timer_ctx != nullptr);
+ delete group_timer_ctx;
}
} // namespace mirror
MOCK_METHOD2(release_image, void(const std::string &, Context *));
MOCK_METHOD3(remove_peer_image, void(const std::string&, const std::string&,
Context *));
+ MOCK_METHOD3(acquire_group, void(InstanceWatcher<librbd::MockTestImageCtx> *,
+ const std::string &, Context *));
+ MOCK_METHOD2(release_group, void(const std::string &, Context *));
+ MOCK_METHOD3(remove_peer_group, void(const std::string&, const std::string&,
+ Context *));
};
template <>
const std::string&,
const std::string&,
Context*));
+ MOCK_METHOD3(notify_group_acquire, void(const std::string&,
+ const std::string&,
+ Context*));
+ MOCK_METHOD3(notify_group_release, void(const std::string&,
+ const std::string&,
+ Context*));
+ MOCK_METHOD4(notify_peer_group_removed, void(const std::string&,
+ const std::string&,
+ const std::string&,
+ Context*));
MOCK_METHOD1(handle_update_leader, void(const std::string&));
set(rbd_mirror_internal
ClusterWatcher.cc
+ GroupReplayer.cc
ImageDeleter.cc
ImageMap.cc
ImageReplayer.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "GroupReplayer.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
+ << __func__ << ": "
+
+extern PerfCounters *g_perf_counters;
+
+namespace rbd {
+namespace mirror {
+
+namespace {
+
+template <typename I>
+class GroupReplayerAdminSocketCommand {
+public:
+ GroupReplayerAdminSocketCommand(const std::string &desc,
+ GroupReplayer<I> *replayer)
+ : desc(desc), replayer(replayer) {
+ }
+ virtual ~GroupReplayerAdminSocketCommand() {}
+ virtual int call(Formatter *f) = 0;
+
+ std::string desc;
+ GroupReplayer<I> *replayer;
+ bool registered = false;
+};
+
+template <typename I>
+class StatusCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+ explicit StatusCommand(const std::string &desc, GroupReplayer<I> *replayer)
+ : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->print_status(f);
+ return 0;
+ }
+};
+
+template <typename I>
+class StartCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+ explicit StartCommand(const std::string &desc, GroupReplayer<I> *replayer)
+ : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->start(nullptr, true);
+ return 0;
+ }
+};
+
+template <typename I>
+class StopCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+ explicit StopCommand(const std::string &desc, GroupReplayer<I> *replayer)
+ : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->stop(nullptr, true);
+ return 0;
+ }
+};
+
+template <typename I>
+class RestartCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+ explicit RestartCommand(const std::string &desc, GroupReplayer<I> *replayer)
+ : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->restart();
+ return 0;
+ }
+};
+
+template <typename I>
+class FlushCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+ explicit FlushCommand(const std::string &desc, GroupReplayer<I> *replayer)
+ : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->flush();
+ return 0;
+ }
+};
+
+template <typename I>
+class GroupReplayerAdminSocketHook : public AdminSocketHook {
+public:
+ GroupReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+ GroupReplayer<I> *replayer)
+ : admin_socket(cct->get_admin_socket()),
+ commands{{"rbd mirror group flush " + name,
+ new FlushCommand<I>("flush rbd mirror group " + name, replayer)},
+ {"rbd mirror group restart " + name,
+ new RestartCommand<I>("restart rbd mirror group " + name, replayer)},
+ {"rbd mirror group start " + name,
+ new StartCommand<I>("start rbd mirror group " + name, replayer)},
+ {"rbd mirror group status " + name,
+ new StatusCommand<I>("get status for rbd mirror group " + name, replayer)},
+ {"rbd mirror group stop " + name,
+ new StopCommand<I>("stop rbd mirror group " + name, replayer)}} {
+ }
+
+ int register_commands() {
+ for (auto &it : commands) {
+ int r = admin_socket->register_command(it.first, this,
+ it.second->desc);
+ if (r < 0) {
+ return r;
+ }
+ it.second->registered = true;
+ }
+ return 0;
+ }
+
+ ~GroupReplayerAdminSocketHook() override {
+ admin_socket->unregister_commands(this);
+ for (auto &it : commands) {
+ delete it.second;
+ }
+ commands.clear();
+ }
+
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ const bufferlist&,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
+ auto i = commands.find(command);
+ ceph_assert(i != commands.end());
+ return i->second->call(f);
+ }
+
+private:
+ typedef std::map<std::string, GroupReplayerAdminSocketCommand<I>*,
+ std::less<>> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
+} // anonymous namespace
+
+template <typename I>
+std::ostream &operator<<(std::ostream &os,
+ const typename GroupReplayer<I>::State &state) {
+ switch (state) {
+ case GroupReplayer<I>::STATE_STARTING:
+ os << "Starting";
+ break;
+ case GroupReplayer<I>::STATE_REPLAYING:
+ os << "Replaying";
+ break;
+ case GroupReplayer<I>::STATE_STOPPING:
+ os << "Stopping";
+ break;
+ case GroupReplayer<I>::STATE_STOPPED:
+ os << "Stopped";
+ break;
+ default:
+ os << "Unknown (" << static_cast<uint32_t>(state) << ")";
+ break;
+ }
+ return os;
+}
+
+template <typename I>
+std::ostream &operator<<(std::ostream &os, const GroupReplayer<I> &replayer) {
+ std::string nspace = replayer.get_namespace();
+ if (!nspace.empty()) {
+ nspace += "/";
+ }
+
+ os << "GroupReplayer: " << &replayer << " [" << replayer.get_local_pool_id() << "/"
+ << nspace << replayer.get_global_group_id() << "]";
+ return os;
+}
+
+template <typename I>
+GroupReplayer<I>::GroupReplayer(
+ librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+ const std::string &global_group_id, Threads<I> *threads,
+ InstanceWatcher<I> *instance_watcher,
+ MirrorStatusUpdater<I>* local_status_updater,
+ journal::CacheManagerHandler *cache_manager_handler,
+ PoolMetaCache* pool_meta_cache) :
+ m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
+ m_global_group_id(global_group_id), m_threads(threads),
+ m_instance_watcher(instance_watcher),
+ m_local_status_updater(local_status_updater),
+ m_cache_manager_handler(cache_manager_handler),
+ m_pool_meta_cache(pool_meta_cache),
+ m_local_group_name(global_group_id),
+ m_lock(ceph::make_mutex("rbd::mirror::GroupReplayer " +
+ stringify(local_io_ctx.get_id()) + " " + global_group_id)) {
+ // Register asok commands using a temporary "remote_pool_name/global_group_id"
+ // name. When the group name becomes known on start the asok commands will be
+ // re-registered using "remote_pool_name/remote_group_name" name.
+
+ m_group_spec = image_replayer::util::compute_image_spec(
+ local_io_ctx, global_group_id);
+ register_admin_socket_hook();
+}
+
+template <typename I>
+GroupReplayer<I>::~GroupReplayer() {
+ unregister_admin_socket_hook();
+ ceph_assert(m_on_start_finish == nullptr);
+ ceph_assert(m_on_stop_finish == nullptr);
+}
+
+
+template <typename I>
+image_replayer::HealthState GroupReplayer<I>::get_health_state() const {
+ // TODO: Implement something like m_mirror_image_status_state for group
+ return image_replayer::HEALTH_STATE_OK;
+}
+
+template <typename I>
+void GroupReplayer<I>::add_peer(const Peer<I>& peer) {
+ dout(10) << "peer=" << peer << dendl;
+
+ std::lock_guard locker{m_lock};
+ auto it = m_peers.find(peer);
+ if (it == m_peers.end()) {
+ m_peers.insert(peer);
+ }
+}
+
+template <typename I>
+void GroupReplayer<I>::set_state_description(int r, const std::string &desc) {
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
+
+ std::lock_guard l{m_lock};
+ m_last_r = r;
+ m_state_desc = desc;
+}
+
+template <typename I>
+void GroupReplayer<I>::start(Context *on_finish, bool manual, bool restart) {
+ dout(10) << "on_finish=" << on_finish << ", manual=" << manual
+ << ", restart=" << restart << dendl;
+
+ int r = 0;
+ {
+ std::lock_guard locker{m_lock};
+ if (!is_stopped_()) {
+ derr << "already running" << dendl;
+ r = -EINVAL;
+ } else if (m_manual_stop && !manual) {
+ dout(5) << "stopped manually, ignoring start without manual flag"
+ << dendl;
+ r = -EPERM;
+ } else if (restart && !m_restart_requested) {
+ dout(10) << "canceled restart" << dendl;
+ r = -ECANCELED;
+ } else {
+ m_state = STATE_STARTING;
+ m_last_r = 0;
+ m_state_desc.clear();
+ m_manual_stop = false;
+ }
+ }
+
+ if (r < 0) {
+ if (on_finish) {
+ on_finish->complete(r);
+ }
+ return;
+ }
+
+ // TODO
+ on_finish->complete(0);
+}
+
+template <typename I>
+void GroupReplayer<I>::stop(Context *on_finish, bool manual, bool restart) {
+ dout(10) << "on_finish=" << on_finish << ", manual=" << manual
+ << ", restart=" << restart << dendl;
+
+ bool running = true;
+ {
+ std::lock_guard locker{m_lock};
+
+ if (restart) {
+ m_restart_requested = true;
+ }
+
+ if (!is_running_()) {
+ running = false;
+ if (!restart && m_restart_requested) {
+ dout(10) << "canceling restart" << dendl;
+ m_restart_requested = false;
+ }
+ } else {
+ if (!is_stopped_()) {
+ if (m_state == STATE_STARTING) {
+ dout(10) << "canceling start" << dendl;
+ } else {
+ dout(10) << "interrupting replay" << dendl;
+ }
+ }
+ }
+ }
+
+ if (!running) {
+ dout(20) << "not running" << dendl;
+ if (on_finish) {
+ on_finish->complete(-EINVAL);
+ }
+ return;
+ }
+
+ // TODO
+ on_finish->complete(0);
+}
+
+template <typename I>
+void GroupReplayer<I>::restart(Context *on_finish) {
+ {
+ std::lock_guard locker{m_lock};
+ m_restart_requested = true;
+ }
+
+ auto ctx = new LambdaContext(
+ [this, on_finish](int r) {
+ if (r < 0) {
+ // Try start anyway.
+ }
+ start(on_finish, true, true);
+ });
+ stop(ctx, false, true);
+}
+
+template <typename I>
+void GroupReplayer<I>::flush() {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ dout(10) << dendl;
+ // TODO
+}
+
+template <typename I>
+void GroupReplayer<I>::print_status(Formatter *f) {
+ dout(10) << dendl;
+
+ std::lock_guard l{m_lock};
+
+ f->open_object_section("group_replayer");
+ f->dump_string("name", m_group_spec);
+ f->dump_stream("state") << m_state;
+ f->close_section();
+}
+
+template <typename I>
+void GroupReplayer<I>::register_admin_socket_hook() {
+ GroupReplayerAdminSocketHook<I> *asok_hook;
+ {
+ std::lock_guard locker{m_lock};
+ if (m_asok_hook != nullptr) {
+ return;
+ }
+
+ dout(15) << "registered asok hook: " << m_group_spec << dendl;
+ asok_hook = new GroupReplayerAdminSocketHook<I>(
+ g_ceph_context, m_group_spec, this);
+ int r = asok_hook->register_commands();
+ if (r == 0) {
+ m_asok_hook = asok_hook;
+ return;
+ }
+ derr << "error registering admin socket commands" << dendl;
+ }
+ delete asok_hook;
+}
+
+template <typename I>
+void GroupReplayer<I>::unregister_admin_socket_hook() {
+ dout(15) << dendl;
+
+ AdminSocketHook *asok_hook = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ std::swap(asok_hook, m_asok_hook);
+ }
+ delete asok_hook;
+}
+
+template <typename I>
+void GroupReplayer<I>::reregister_admin_socket_hook() {
+ std::unique_lock locker{m_lock};
+
+ auto group_spec = image_replayer::util::compute_image_spec(
+ m_local_io_ctx, m_local_group_name);
+ if (m_asok_hook != nullptr && m_group_spec == group_spec) {
+ return;
+ }
+
+ dout(15) << "old_group_spec=" << m_group_spec << ", "
+ << "new_group_spec=" << group_spec << dendl;
+ m_group_spec = group_spec;
+
+ if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
+ // no need to re-register if stopping
+ return;
+ }
+ locker.unlock();
+
+ unregister_admin_socket_hook();
+ register_admin_socket_hook();
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::GroupReplayer<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_GROUP_REPLAYER_H
+#define CEPH_RBD_MIRROR_GROUP_REPLAYER_H
+
+#include "common/ceph_mutex.h"
+#include "include/rados/librados.hpp"
+#include "tools/rbd_mirror/Types.h"
+#include "tools/rbd_mirror/image_replayer/Types.h"
+#include <boost/optional.hpp>
+#include <string>
+
+class AdminSocketHook;
+
+namespace journal { struct CacheManagerHandler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+template <typename> struct InstanceWatcher;
+template <typename> struct MirrorStatusUpdater;
+struct PoolMetaCache;
+template <typename> struct Threads;
+
+/**
+ * Replays changes from a remote cluster for a single group.
+ */
+template <typename ImageCtxT = librbd::ImageCtx>
+class GroupReplayer {
+public:
+ static GroupReplayer *create(
+ librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+ const std::string &global_group_id, Threads<ImageCtxT> *threads,
+ InstanceWatcher<ImageCtxT> *instance_watcher,
+ MirrorStatusUpdater<ImageCtxT>* local_status_updater,
+ journal::CacheManagerHandler *cache_manager_handler,
+ PoolMetaCache* pool_meta_cache) {
+ return new GroupReplayer(local_io_ctx, local_mirror_uuid, global_group_id,
+ threads, instance_watcher, local_status_updater,
+ cache_manager_handler, pool_meta_cache);
+ }
+ void destroy() {
+ delete this;
+ }
+
+ GroupReplayer(librados::IoCtx &local_io_ctx,
+ const std::string &local_mirror_uuid,
+ const std::string &global_group_id,
+ Threads<ImageCtxT> *threads,
+ InstanceWatcher<ImageCtxT> *instance_watcher,
+ MirrorStatusUpdater<ImageCtxT>* local_status_updater,
+ journal::CacheManagerHandler *cache_manager_handler,
+ PoolMetaCache* pool_meta_cache);
+ virtual ~GroupReplayer();
+ GroupReplayer(const GroupReplayer&) = delete;
+ GroupReplayer& operator=(const GroupReplayer&) = delete;
+
+ bool is_stopped() { std::lock_guard l{m_lock}; return is_stopped_(); }
+ bool is_running() { std::lock_guard l{m_lock}; return is_running_(); }
+ bool is_replaying() { std::lock_guard l{m_lock}; return is_replaying_(); }
+
+ std::string get_name() { std::lock_guard l{m_lock}; return m_group_spec; };
+ void set_state_description(int r, const std::string &desc);
+
+ // TODO temporary until policy handles release of group replayers
+ inline bool is_finished() const {
+ std::lock_guard locker{m_lock};
+ return m_finished;
+ }
+ inline void set_finished(bool finished) {
+ std::lock_guard locker{m_lock};
+ m_finished = finished;
+ }
+
+ inline bool is_blocklisted() const {
+ std::lock_guard locker{m_lock};
+ return (m_last_r == -EBLOCKLISTED);
+ }
+
+ image_replayer::HealthState get_health_state() const;
+
+ void add_peer(const Peer<ImageCtxT>& peer);
+
+ inline int64_t get_local_pool_id() const {
+ return m_local_io_ctx.get_id();
+ }
+ inline std::string get_namespace() const {
+ return m_local_io_ctx.get_namespace();
+ }
+ inline const std::string& get_global_group_id() const {
+ return m_global_group_id;
+ }
+
+ void start(Context *on_finish = nullptr, bool manual = false,
+ bool restart = false);
+ void stop(Context *on_finish = nullptr, bool manual = false,
+ bool restart = false);
+ void restart(Context *on_finish = nullptr);
+ void flush();
+
+ void print_status(Formatter *f);
+
+ template <typename>
+ friend std::ostream &operator<<(std::ostream &os,
+ const GroupReplayer &replayer);
+
+ /**
+ * @verbatim
+ * (error)
+ * <uninitialized> <------------------------------------ FAIL
+ * | ^
+ * v *
+ * <starting> *
+ * | *
+ * v (error) *
+ * <stopped>
+ *
+ * @endverbatim
+ */
+
+private:
+ typedef std::set<Peer<ImageCtxT>> Peers;
+
+ enum State {
+ STATE_UNKNOWN,
+ STATE_STARTING,
+ STATE_REPLAYING,
+ STATE_STOPPING,
+ STATE_STOPPED,
+ };
+
+ librados::IoCtx &m_local_io_ctx;
+ std::string m_local_mirror_uuid;
+ std::string m_global_group_id;
+ Threads<ImageCtxT> *m_threads;
+ InstanceWatcher<ImageCtxT> *m_instance_watcher;
+ MirrorStatusUpdater<ImageCtxT>* m_local_status_updater;
+ journal::CacheManagerHandler *m_cache_manager_handler;
+ PoolMetaCache* m_pool_meta_cache;
+
+ std::string m_local_group_name;
+ std::string m_group_spec;
+ Peers m_peers;
+
+ mutable ceph::mutex m_lock;
+ State m_state = STATE_STOPPED;
+ std::string m_state_desc;
+ int m_last_r = 0;
+
+ Context *m_on_start_finish = nullptr;
+ Context *m_on_stop_finish = nullptr;
+ bool m_stop_requested = false;
+ bool m_restart_requested = false;
+ bool m_manual_stop = false;
+ bool m_finished = false;
+
+ AdminSocketHook *m_asok_hook = nullptr;
+
+ bool is_stopped_() const {
+ return m_state == STATE_STOPPED;
+ }
+ bool is_running_() const {
+ return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
+ }
+ bool is_replaying_() const {
+ return (m_state == STATE_REPLAYING);
+ }
+
+ void register_admin_socket_hook();
+ void unregister_admin_socket_hook();
+ void reregister_admin_socket_hook();
+};
+
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::GroupReplayer<librbd::ImageCtx>;
+
+#endif // CEPH_RBD_MIRROR_GROUP_REPLAYER_H
#include "common/errno.h"
#include "librbd/Utils.h"
#include "librbd/asio/ContextWQ.h"
+#include "GroupReplayer.h"
#include "ImageReplayer.h"
#include "InstanceReplayer.h"
#include "ServiceDaemon.h"
namespace {
-const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
-const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
-const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
+const std::string SERVICE_DAEMON_IMAGE_ASSIGNED_COUNT_KEY("image_assigned_count");
+const std::string SERVICE_DAEMON_IMAGE_WARNING_COUNT_KEY("image_warning_count");
+const std::string SERVICE_DAEMON_IMAGE_ERROR_COUNT_KEY("image_error_count");
+
+const std::string SERVICE_DAEMON_GROUP_ASSIGNED_COUNT_KEY("group_assigned_count");
+const std::string SERVICE_DAEMON_GROUP_WARNING_COUNT_KEY("group_warning_count");
+const std::string SERVICE_DAEMON_GROUP_ERROR_COUNT_KEY("group_error_count");
} // anonymous namespace
{
std::lock_guard timer_locker{m_threads->timer_lock};
schedule_image_state_check_task();
+ schedule_group_state_check_task();
}
on_finish->complete(0);
});
std::lock_guard locker{m_lock};
C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
+ for (auto it = m_group_replayers.begin(); it != m_group_replayers.end();
+ it = m_group_replayers.erase(it)) {
+ auto group_replayer = it->second;
+ auto ctx = gather_ctx->new_sub();
+ ctx = new LambdaContext(
+ [group_replayer, ctx] (int r) {
+ group_replayer->destroy();
+ ctx->complete(0);
+ });
+ stop_group_replayer(group_replayer, ctx);
+ }
for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
it = m_image_replayers.erase(it)) {
auto image_replayer = it->second;
m_threads->work_queue->queue(on_finish, 0);
}
+template <typename I>
+void InstanceReplayer<I>::acquire_group(InstanceWatcher<I> *instance_watcher,
+ const std::string &global_group_id,
+ Context *on_finish) {
+ dout(10) << "global_group_id=" << global_group_id << dendl;
+
+ std::lock_guard locker{m_lock};
+
+ ceph_assert(m_on_shut_down == nullptr);
+
+ auto it = m_group_replayers.find(global_group_id);
+ if (it == m_group_replayers.end()) {
+ auto group_replayer = GroupReplayer<I>::create(
+ m_local_io_ctx, m_local_mirror_uuid, global_group_id,
+ m_threads, instance_watcher, m_local_status_updater,
+ m_cache_manager_handler, m_pool_meta_cache);
+
+ dout(10) << global_group_id << ": creating replayer " << group_replayer
+ << dendl;
+
+ it = m_group_replayers.insert(std::make_pair(global_group_id,
+ group_replayer)).first;
+
+ // TODO only a single peer is currently supported
+ ceph_assert(m_peers.size() == 1);
+ auto peer = *m_peers.begin();
+ group_replayer->add_peer(peer);
+ start_group_replayer(group_replayer);
+ } else {
+ // A duplicate acquire notification implies (1) connection hiccup or
+ // (2) new leader election. For the second case, restart the replayer to
+ // detect if the group has been deleted while the leader was offline
+ auto& group_replayer = it->second;
+ group_replayer->set_finished(false);
+ group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
+ }
+
+ m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_group(const std::string &global_group_id,
+ Context *on_finish) {
+ dout(10) << "global_group_id=" << global_group_id << dendl;
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_on_shut_down == nullptr);
+
+ auto it = m_group_replayers.find(global_group_id);
+ if (it == m_group_replayers.end()) {
+ dout(5) << global_group_id << ": not found" << dendl;
+ m_threads->work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ auto group_replayer = it->second;
+ m_group_replayers.erase(it);
+
+ on_finish = new LambdaContext(
+ [group_replayer, on_finish] (int r) {
+ group_replayer->destroy();
+ on_finish->complete(0);
+ });
+ stop_group_replayer(group_replayer, on_finish);
+}
+
+template <typename I>
+void InstanceReplayer<I>::remove_peer_group(const std::string &global_group_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish) {
+ dout(10) << "global_group_id=" << global_group_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_on_shut_down == nullptr);
+
+ auto it = m_group_replayers.find(global_group_id);
+ if (it != m_group_replayers.end()) {
+ // TODO only a single peer is currently supported, therefore
+ // we can just interrupt the current group replayer and
+ // it will eventually detect that the peer group is missing and
+ // determine if a delete propagation is required.
+ auto group_replayer = it->second;
+ group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
+ }
+ m_threads->work_queue->queue(on_finish, 0);
+}
+
template <typename I>
void InstanceReplayer<I>::print_status(Formatter *f) {
dout(10) << dendl;
std::lock_guard locker{m_lock};
+ f->open_array_section("group_replayers");
+ for (auto &[_, group_replayer] : m_group_replayers) {
+ group_replayer->print_status(f);
+ }
+ f->close_section();
f->open_array_section("image_replayers");
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
+ for (auto &[_, image_replayer] : m_image_replayers) {
image_replayer->print_status(f);
}
f->close_section();
auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
auto gather_ctx = new C_Gather(
cct, new C_TrackedOp(m_async_op_tracker, nullptr));
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
+ for (auto &[_, group_replayer] : m_group_replayers) {
+ group_replayer->start(gather_ctx->new_sub(), true);
+ }
+ for (auto &[_, image_replayer] : m_image_replayers) {
image_replayer->start(gather_ctx->new_sub(), true);
}
m_manual_stop = true;
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
+ for (auto &[_, group_replayer] : m_group_replayers) {
+ group_replayer->stop(gather_ctx->new_sub(), true);
+ }
+ for (auto &[_, image_replayer] : m_image_replayers) {
image_replayer->stop(gather_ctx->new_sub(), true);
}
}
m_manual_stop = false;
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
+ for (auto &[_, group_replayer] : m_group_replayers) {
+ group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
+ }
+ for (auto &[_, image_replayer] : m_image_replayers) {
image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
}
}
std::lock_guard locker{m_lock};
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
+ for (auto &[_, group_replayer] : m_group_replayers) {
+ group_replayer->flush();
+ }
+ for (auto &[_, image_replayer] : m_image_replayers) {
image_replayer->flush();
}
}
m_service_daemon->add_or_update_namespace_attribute(
m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
- SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
+ SERVICE_DAEMON_IMAGE_ASSIGNED_COUNT_KEY, image_count);
m_service_daemon->add_or_update_namespace_attribute(
m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
- SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
+ SERVICE_DAEMON_IMAGE_WARNING_COUNT_KEY, warning_count);
m_service_daemon->add_or_update_namespace_attribute(
m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
- SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
+ SERVICE_DAEMON_IMAGE_ERROR_COUNT_KEY, error_count);
}
template <typename I>
}
}
-template <typename I>
-void InstanceReplayer<I>::wait_for_ops() {
- dout(10) << dendl;
-
- Context *ctx = create_context_callback<
- InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
-
- m_async_op_tracker.wait_for_ops(ctx);
-}
-
-template <typename I>
-void InstanceReplayer<I>::handle_wait_for_ops(int r) {
- dout(10) << "r=" << r << dendl;
-
- ceph_assert(r == 0);
-
- std::lock_guard locker{m_lock};
- stop_image_replayers();
-}
-
template <typename I>
void InstanceReplayer<I>::stop_image_replayers() {
dout(10) << dendl;
m_threads->timer->add_event_after(after, m_image_state_check_task);
}
+template <typename I>
+void InstanceReplayer<I>::start_group_replayer(
+ GroupReplayer<I> *group_replayer) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ std::string global_group_id = group_replayer->get_global_group_id();
+ if (!group_replayer->is_stopped()) {
+ return;
+ } else if (group_replayer->is_blocklisted()) {
+ derr << "global_group_id=" << global_group_id << ": blocklisted detected "
+ << "during group replay" << dendl;
+ m_blocklisted = true;
+ return;
+ } else if (group_replayer->is_finished()) {
+ // TODO temporary until policy integrated
+ dout(5) << "removing group replayer for global_group_id="
+ << global_group_id << dendl;
+ m_group_replayers.erase(group_replayer->get_global_group_id());
+ group_replayer->destroy();
+ return;
+ } else if (m_manual_stop) {
+ return;
+ }
+
+ dout(10) << "global_group_id=" << global_group_id << dendl;
+ group_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
+}
+
+template <typename I>
+void InstanceReplayer<I>::queue_start_group_replayers() {
+ dout(10) << dendl;
+
+ Context *ctx = create_context_callback<
+ InstanceReplayer, &InstanceReplayer<I>::start_group_replayers>(this);
+ m_async_op_tracker.start_op();
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_group_replayers(int r) {
+ dout(10) << dendl;
+
+ std::lock_guard locker{m_lock};
+ if (m_on_shut_down != nullptr) {
+ return;
+ }
+
+ uint64_t group_count = 0;
+ uint64_t warning_count = 0;
+ uint64_t error_count = 0;
+ for (auto it = m_group_replayers.begin();
+ it != m_group_replayers.end();) {
+ auto current_it(it);
+ ++it;
+
+ ++group_count;
+ auto health_state = current_it->second->get_health_state();
+ if (health_state == image_replayer::HEALTH_STATE_WARNING) {
+ ++warning_count;
+ } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
+ ++error_count;
+ }
+
+ start_group_replayer(current_it->second);
+ }
+
+ m_service_daemon->add_or_update_namespace_attribute(
+ m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+ SERVICE_DAEMON_GROUP_ASSIGNED_COUNT_KEY, group_count);
+ m_service_daemon->add_or_update_namespace_attribute(
+ m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+ SERVICE_DAEMON_GROUP_WARNING_COUNT_KEY, warning_count);
+ m_service_daemon->add_or_update_namespace_attribute(
+ m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+ SERVICE_DAEMON_GROUP_ERROR_COUNT_KEY, error_count);
+
+ m_async_op_tracker.finish_op();
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_group_replayer(GroupReplayer<I> *group_replayer,
+ Context *on_finish) {
+ dout(10) << group_replayer << " global_group_id="
+ << group_replayer->get_global_group_id() << ", on_finish="
+ << on_finish << dendl;
+
+ if (group_replayer->is_stopped()) {
+ m_threads->work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ m_async_op_tracker.start_op();
+ Context *ctx = create_async_context_callback(
+ m_threads->work_queue, new LambdaContext(
+ [this, group_replayer, on_finish] (int r) {
+ stop_group_replayer(group_replayer, on_finish);
+ m_async_op_tracker.finish_op();
+ }));
+
+ if (group_replayer->is_running()) {
+ group_replayer->stop(ctx, false);
+ } else {
+ int after = 1;
+ dout(10) << "scheduling group replayer " << group_replayer << " stop after "
+ << after << " sec (task " << ctx << ")" << dendl;
+ ctx = new LambdaContext(
+ [this, after, ctx] (int r) {
+ std::lock_guard timer_locker{m_threads->timer_lock};
+ m_threads->timer->add_event_after(after, ctx);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+ }
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_group_replayers() {
+ dout(10) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ Context *ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
+ &InstanceReplayer<I>::handle_stop_group_replayers>(this));
+
+ C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+ for (auto &[_, group_replayer] : m_group_replayers) {
+ stop_group_replayer(group_replayer, gather_ctx->new_sub());
+ }
+ gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_stop_group_replayers(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ ceph_assert(r == 0);
+
+ Context *on_finish = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+
+ for (auto &[_, group_replayer] : m_group_replayers) {
+ ceph_assert(group_replayer->is_stopped());
+ group_replayer->destroy();
+ }
+ m_group_replayers.clear();
+
+ ceph_assert(m_on_shut_down != nullptr);
+ std::swap(on_finish, m_on_shut_down);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceReplayer<I>::cancel_group_state_check_task() {
+ std::lock_guard timer_locker{m_threads->timer_lock};
+
+ if (m_group_state_check_task == nullptr) {
+ return;
+ }
+
+ dout(10) << m_group_state_check_task << dendl;
+ bool canceled = m_threads->timer->cancel_event(m_group_state_check_task);
+ ceph_assert(canceled);
+ m_group_state_check_task = nullptr;
+}
+
+template <typename I>
+void InstanceReplayer<I>::schedule_group_state_check_task() {
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+ ceph_assert(m_group_state_check_task == nullptr);
+
+ m_group_state_check_task = new LambdaContext(
+ [this](int r) {
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+ m_group_state_check_task = nullptr;
+ schedule_group_state_check_task();
+ queue_start_group_replayers();
+ });
+
+ auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+ int after = cct->_conf.get_val<uint64_t>(
+ "rbd_mirror_image_state_check_interval");
+
+ dout(10) << "scheduling group state check after " << after << " sec (task "
+ << m_group_state_check_task << ")" << dendl;
+ m_threads->timer->add_event_after(after, m_group_state_check_task);
+}
+
+template <typename I>
+void InstanceReplayer<I>::wait_for_ops() {
+ dout(10) << dendl;
+
+ Context *ctx = create_context_callback<
+ InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
+
+ m_async_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_wait_for_ops(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ ceph_assert(r == 0);
+
+ std::lock_guard locker{m_lock};
+ stop_image_replayers();
+}
+
} // namespace mirror
} // namespace rbd
namespace rbd {
namespace mirror {
+template <typename> class GroupReplayer;
template <typename> class ImageReplayer;
template <typename> class InstanceWatcher;
template <typename> class MirrorStatusUpdater;
const std::string &peer_mirror_uuid,
Context *on_finish);
+ void acquire_group(InstanceWatcher<ImageCtxT> *instance_watcher,
+ const std::string &global_group_id, Context *on_finish);
+ void release_group(const std::string &global_group_id, Context *on_finish);
+ void remove_peer_group(const std::string &global_group_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish);
+
void release_all(Context *on_finish);
void print_status(Formatter *f);
*
* <uninitialized> <-------------------\
* | (init) | (repeat for each
- * v STOP_IMAGE_REPLAYER ---\ image replayer)
- * SCHEDULE_IMAGE_STATE_CHECK_TASK ^ ^ |
+ * v STOP_GROUP_REPLAYER ---\ group replayer)
+ * SCHEDULE_GROUP_STATE_CHECK_TASK ^ ^ |
+ * | | | |
+ * v | \---------/
+ * SCHEDULE_IMAGE_STATE_CHECK_TASK | (repeat for each
+ * | STOP_IMAGE_REPLAYER ---\ image replayer)
+ * | ^ ^ |
* | | | |
* v (shut_down) | \---------/
* <initialized> -----------------> WAIT_FOR_OPS
mutable ceph::mutex m_lock;
AsyncOpTracker m_async_op_tracker;
std::map<std::string, ImageReplayer<ImageCtxT> *> m_image_replayers;
+ std::map<std::string, GroupReplayer<ImageCtxT> *> m_group_replayers;
Peers m_peers;
Context *m_image_state_check_task = nullptr;
+ Context *m_group_state_check_task = nullptr;
Context *m_on_shut_down = nullptr;
bool m_manual_stop = false;
bool m_blocklisted = false;
void schedule_image_state_check_task();
void cancel_image_state_check_task();
+
+ void start_group_replayer(GroupReplayer<ImageCtxT> *group_replayer);
+ void queue_start_group_replayers();
+ void start_group_replayers(int r);
+
+ void stop_group_replayer(GroupReplayer<ImageCtxT> *group_replayer,
+ Context *on_finish);
+
+ void stop_group_replayers();
+ void handle_stop_group_replayers(int r);
+
+ void schedule_group_state_check_task();
+ void cancel_group_state_check_task();
};
} // namespace mirror
req->send();
}
+template <typename I>
+void InstanceWatcher<I>::notify_group_acquire(
+ const std::string &instance_id, const std::string &global_group_id,
+ Context *on_notify_ack) {
+ dout(10) << "instance_id=" << instance_id << ", global_group_id="
+ << global_group_id << dendl;
+
+ std::lock_guard locker{m_lock};
+
+ ceph_assert(m_on_finish == nullptr);
+
+ uint64_t request_id = ++m_request_seq;
+ bufferlist bl;
+ encode(NotifyMessage{GroupAcquirePayload{request_id, global_group_id}}, bl);
+ auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+ std::move(bl), on_notify_ack);
+ req->send();
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_group_release(
+ const std::string &instance_id, const std::string &global_group_id,
+ Context *on_notify_ack) {
+ dout(10) << "instance_id=" << instance_id << ", global_group_id="
+ << global_group_id << dendl;
+
+ std::lock_guard locker{m_lock};
+
+ ceph_assert(m_on_finish == nullptr);
+
+ uint64_t request_id = ++m_request_seq;
+ bufferlist bl;
+ encode(NotifyMessage{GroupReleasePayload{request_id, global_group_id}}, bl);
+ auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+ std::move(bl), on_notify_ack);
+ req->send();
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_peer_group_removed(
+ const std::string &instance_id, const std::string &global_group_id,
+ const std::string &peer_mirror_uuid, Context *on_notify_ack) {
+ dout(10) << "instance_id=" << instance_id << ", "
+ << "global_group_id=" << global_group_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_on_finish == nullptr);
+
+ uint64_t request_id = ++m_request_seq;
+ bufferlist bl;
+ encode(NotifyMessage{PeerGroupRemovedPayload{request_id, global_group_id,
+ peer_mirror_uuid}}, bl);
+ auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+ std::move(bl), on_notify_ack);
+ req->send();
+}
+
template <typename I>
void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
Context *on_sync_start) {
m_work_queue->queue(ctx, 0);
}
+template <typename I>
+void InstanceWatcher<I>::handle_group_acquire(
+ const std::string &global_group_id, Context *on_finish) {
+ dout(10) << "global_group_id=" << global_group_id << dendl;
+
+ auto ctx = new LambdaContext(
+ [this, global_group_id, on_finish] (int r) {
+ m_instance_replayer->acquire_group(this, global_group_id, on_finish);
+ m_notify_op_tracker.finish_op();
+ });
+
+ m_notify_op_tracker.start_op();
+ m_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_group_release(
+ const std::string &global_group_id, Context *on_finish) {
+ dout(10) << "global_group_id=" << global_group_id << dendl;
+
+ auto ctx = new LambdaContext(
+ [this, global_group_id, on_finish] (int r) {
+ m_instance_replayer->release_group(global_group_id, on_finish);
+ m_notify_op_tracker.finish_op();
+ });
+
+ m_notify_op_tracker.start_op();
+ m_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_peer_group_removed(
+ const std::string &global_group_id, const std::string &peer_mirror_uuid,
+ Context *on_finish) {
+ dout(10) << "global_group_id=" << global_group_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+ auto ctx = new LambdaContext(
+ [this, peer_mirror_uuid, global_group_id, on_finish] (int r) {
+ m_instance_replayer->remove_peer_group(global_group_id,
+ peer_mirror_uuid, on_finish);
+ m_notify_op_tracker.finish_op();
+ });
+
+ m_notify_op_tracker.start_op();
+ m_work_queue->queue(ctx, 0);
+}
+
template <typename I>
void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
const std::string &sync_id,
}
}
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const GroupAcquirePayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(10) << "group_acquire: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish != nullptr) {
+ handle_group_acquire(payload.global_group_id, on_finish);
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const GroupReleasePayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(10) << "group_release: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish != nullptr) {
+ handle_group_release(payload.global_group_id, on_finish);
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const PeerGroupRemovedPayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(10) << "remove_peer_group: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish != nullptr) {
+ handle_peer_group_removed(payload.global_group_id, payload.peer_mirror_uuid,
+ on_finish);
+ }
+}
+
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const SyncRequestPayload &payload,
const std::string &peer_mirror_uuid,
Context *on_notify_ack);
+ void notify_group_acquire(const std::string &instance_id,
+ const std::string &global_group_id,
+ Context *on_notify_ack);
+ void notify_group_release(const std::string &instance_id,
+ const std::string &global_group_id,
+ Context *on_notify_ack);
+ void notify_peer_group_removed(const std::string &instance_id,
+ const std::string &global_group_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_notify_ack);
+
void notify_sync_request(const std::string &sync_id, Context *on_sync_start);
bool cancel_sync_request(const std::string &sync_id);
void notify_sync_complete(const std::string &sync_id);
const std::string &peer_mirror_uuid,
Context *on_finish);
+ void handle_group_acquire(const std::string &global_group_id,
+ Context *on_finish);
+ void handle_group_release(const std::string &global_group_id,
+ Context *on_finish);
+ void handle_peer_group_removed(const std::string &global_group_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish);
+
void handle_sync_request(const std::string &instance_id,
const std::string &sync_id, Context *on_finish);
void handle_sync_start(const std::string &instance_id,
void handle_payload(const std::string &instance_id,
const instance_watcher::PeerImageRemovedPayload &payload,
C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::GroupAcquirePayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::GroupReleasePayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::PeerGroupRemovedPayload &payload,
+ C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::SyncRequestPayload &payload,
C_NotifyAck *on_notify_ack);
template <typename I>
void NamespaceReplayer<I>::handle_acquire_image(const std::string &global_image_id,
- const std::string &instance_id,
- Context* on_finish) {
+ const std::string &instance_id,
+ Context* on_finish) {
dout(5) << "global_image_id=" << global_image_id << ", "
<< "instance_id=" << instance_id << dendl;
template <typename I>
void NamespaceReplayer<I>::handle_release_image(const std::string &global_image_id,
- const std::string &instance_id,
- Context* on_finish) {
+ const std::string &instance_id,
+ Context* on_finish) {
dout(5) << "global_image_id=" << global_image_id << ", "
<< "instance_id=" << instance_id << dendl;
template <typename I>
void NamespaceReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
- const std::string &global_image_id,
- const std::string &instance_id,
- Context* on_finish) {
+ const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
ceph_assert(!mirror_uuid.empty());
dout(5) << "mirror_uuid=" << mirror_uuid << ", "
<< "global_image_id=" << global_image_id << ", "
mirror_uuid, on_finish);
}
+template <typename I>
+void NamespaceReplayer<I>::handle_acquire_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_group_id=" << global_group_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_group_acquire(instance_id, global_group_id,
+ on_finish);
+}
+
+template <typename I>
+void NamespaceReplayer<I>::handle_release_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_group_id=" << global_group_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_group_release(instance_id, global_group_id,
+ on_finish);
+}
+
+template <typename I>
+void NamespaceReplayer<I>::handle_remove_group(const std::string &mirror_uuid,
+ const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ ceph_assert(!mirror_uuid.empty());
+ dout(5) << "mirror_uuid=" << mirror_uuid << ", "
+ << "global_group_id=" << global_group_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_peer_group_removed(instance_id, global_group_id,
+ mirror_uuid, on_finish);
+}
+
template <typename I>
std::string NamespaceReplayer<I>::get_local_namespace() {
return m_local_namespace_name;
const std::string &instance_id,
Context* on_finish) override {
namespace_replayer->handle_remove_image(mirror_uuid, global_image_id,
- instance_id, on_finish);
+ instance_id, on_finish);
+ }
+
+ void acquire_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) override {
+ namespace_replayer->handle_acquire_group(global_group_id, instance_id,
+ on_finish);
+ }
+
+ void release_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) override {
+ namespace_replayer->handle_release_group(global_group_id, instance_id,
+ on_finish);
+ }
+
+ void remove_group(const std::string &mirror_uuid,
+ const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) override {
+ namespace_replayer->handle_remove_group(mirror_uuid, global_group_id,
+ instance_id, on_finish);
}
};
const std::string &global_image_id,
const std::string &instance_id,
Context* on_finish);
+ void handle_acquire_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish);
+ void handle_release_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish);
+ void handle_remove_group(const std::string &mirror_uuid,
+ const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish);
std::string m_local_namespace_name;
std::string m_remote_namespace_name;
<< "image_count=" << image_count << ", "
<< "enabled=" << enabled << dendl;
- // TODO
+ std::lock_guard locker{m_lock};
+ MirrorEntity entity(MIRROR_ENTITY_TYPE_GROUP, global_group_id, image_count);
+ m_pending_added_entities.erase(entity);
+ m_pending_removed_entities.erase(entity);
+
+ if (enabled) {
+ m_pending_added_entities.insert({entity, id});
+ schedule_listener();
+ } else {
+ m_pending_removed_entities.insert({entity, id});
+ schedule_listener();
+ }
}
template <typename I>
const std::string &global_image_id,
const std::string &instance_id,
Context* on_finish) = 0;
+ virtual void acquire_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) = 0;
+ virtual void release_group(const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) = 0;
+ virtual void remove_group(const std::string &mirror_uuid,
+ const std::string &global_group_id,
+ const std::string &instance_id,
+ Context* on_finish) = 0;
};
struct LookupInfo {
f->dump_string("sync_id", sync_id);
}
+void GroupPayloadBase::encode(bufferlist &bl) const {
+ using ceph::encode;
+ PayloadBase::encode(bl);
+ encode(global_group_id, bl);
+}
+
+void GroupPayloadBase::decode(__u8 version, bufferlist::const_iterator &iter) {
+ using ceph::decode;
+ PayloadBase::decode(version, iter);
+ decode(global_group_id, iter);
+}
+
+void GroupPayloadBase::dump(Formatter *f) const {
+ PayloadBase::dump(f);
+ f->dump_string("global_group_id", global_group_id);
+}
+
+void PeerGroupRemovedPayload::encode(bufferlist &bl) const {
+ using ceph::encode;
+ PayloadBase::encode(bl);
+ encode(global_group_id, bl);
+ encode(peer_mirror_uuid, bl);
+}
+
+void PeerGroupRemovedPayload::decode(__u8 version, bufferlist::const_iterator &iter) {
+ using ceph::decode;
+ PayloadBase::decode(version, iter);
+ decode(global_group_id, iter);
+ decode(peer_mirror_uuid, iter);
+}
+
+void PeerGroupRemovedPayload::dump(Formatter *f) const {
+ PayloadBase::dump(f);
+ f->dump_string("global_group_id", global_group_id);
+ f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
+}
+
void UnknownPayload::encode(bufferlist &bl) const {
ceph_abort();
}
case NOTIFY_OP_SYNC_START:
payload = SyncStartPayload();
break;
+ case NOTIFY_OP_GROUP_ACQUIRE:
+ payload = GroupAcquirePayload();
+ break;
+ case NOTIFY_OP_GROUP_RELEASE:
+ payload = GroupReleasePayload();
+ break;
+ case NOTIFY_OP_PEER_GROUP_REMOVED:
+ payload = PeerGroupRemovedPayload();
+ break;
default:
payload = UnknownPayload();
break;
o.push_back(new NotifyMessage(SyncStartPayload()));
o.push_back(new NotifyMessage(SyncStartPayload(1, "sync_id")));
+
+ o.push_back(new NotifyMessage(GroupAcquirePayload()));
+ o.push_back(new NotifyMessage(GroupAcquirePayload(1, "gid")));
+
+ o.push_back(new NotifyMessage(GroupReleasePayload()));
+ o.push_back(new NotifyMessage(GroupReleasePayload(1, "gid")));
+
+ o.push_back(new NotifyMessage(PeerGroupRemovedPayload()));
+ o.push_back(new NotifyMessage(PeerGroupRemovedPayload(1, "gid", "uuid")));
}
std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
case NOTIFY_OP_SYNC_START:
out << "SyncStart";
break;
+ case NOTIFY_OP_GROUP_ACQUIRE:
+ out << "GroupAcquire";
+ break;
+ case NOTIFY_OP_GROUP_RELEASE:
+ out << "GroupRelease";
+ break;
+ case NOTIFY_OP_PEER_GROUP_REMOVED:
+ out << "PeerGroupRemoved";
+ break;
default:
out << "Unknown (" << static_cast<uint32_t>(op) << ")";
break;
NOTIFY_OP_IMAGE_RELEASE = 1,
NOTIFY_OP_PEER_IMAGE_REMOVED = 2,
NOTIFY_OP_SYNC_REQUEST = 3,
- NOTIFY_OP_SYNC_START = 4
+ NOTIFY_OP_SYNC_START = 4,
+ NOTIFY_OP_GROUP_ACQUIRE = 5,
+ NOTIFY_OP_GROUP_RELEASE = 6,
+ NOTIFY_OP_PEER_GROUP_REMOVED = 7,
};
struct PayloadBase {
}
};
+struct GroupPayloadBase : public PayloadBase {
+ std::string global_group_id;
+
+ GroupPayloadBase() : PayloadBase() {
+ }
+
+ GroupPayloadBase(uint64_t request_id, const std::string &global_group_id)
+ : PayloadBase(request_id), global_group_id(global_group_id) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &iter);
+ void dump(Formatter *f) const;
+};
+
+struct GroupAcquirePayload : public GroupPayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_GROUP_ACQUIRE;
+
+ GroupAcquirePayload() {
+ }
+ GroupAcquirePayload(uint64_t request_id, const std::string &global_group_id)
+ : GroupPayloadBase(request_id, global_group_id) {
+ }
+};
+
+struct GroupReleasePayload : public GroupPayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_GROUP_RELEASE;
+
+ GroupReleasePayload() {
+ }
+ GroupReleasePayload(uint64_t request_id, const std::string &global_group_id)
+ : GroupPayloadBase(request_id, global_group_id) {
+ }
+};
+
+struct PeerGroupRemovedPayload : public PayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_PEER_GROUP_REMOVED;
+
+ std::string global_group_id;
+ std::string peer_mirror_uuid;
+
+ PeerGroupRemovedPayload() {
+ }
+ PeerGroupRemovedPayload(uint64_t request_id,
+ const std::string& global_group_id,
+ const std::string& peer_mirror_uuid)
+ : PayloadBase(request_id),
+ global_group_id(global_group_id), peer_mirror_uuid(peer_mirror_uuid) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &iter);
+ void dump(Formatter *f) const;
+};
+
struct UnknownPayload {
static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
PeerImageRemovedPayload,
SyncRequestPayload,
SyncStartPayload,
+ GroupAcquirePayload,
+ GroupReleasePayload,
+ PeerGroupRemovedPayload,
UnknownPayload> Payload;
struct NotifyMessage {