From: Mykola Golub Date: Tue, 5 Jan 2021 16:18:57 +0000 (+0000) Subject: rbd-mirror: hook GroupReplayer X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6e93031d5e2f29d763a6a29927c1c176e6692940;p=ceph-ci.git rbd-mirror: hook GroupReplayer Signed-off-by: Mykola Golub Signed-off-by: Prasanna Kumar Kalever --- diff --git a/src/test/rbd_mirror/test_mock_ImageMap.cc b/src/test/rbd_mirror/test_mock_ImageMap.cc index 419969dd6a9..8ae6281597d 100644 --- a/src/test/rbd_mirror/test_mock_ImageMap.cc +++ b/src/test/rbd_mirror/test_mock_ImageMap.cc @@ -142,6 +142,10 @@ public: MOCK_METHOD2(mock_release_image, void(const std::string &, Context*)); MOCK_METHOD3(mock_remove_image, void(const std::string &, const std::string &, Context*)); + MOCK_METHOD2(mock_acquire_group, void(const std::string &, Context*)); + MOCK_METHOD2(mock_release_group, void(const std::string &, Context*)); + MOCK_METHOD3(mock_remove_group, void(const std::string &, + const std::string &, Context*)); void acquire_image(const std::string &global_image_id, const std::string &instance_id, Context* on_finish) { @@ -158,6 +162,22 @@ public: const std::string &instance_id, Context* on_finish) { mock_remove_image(mirror_uuid, global_image_id, on_finish); } + + void acquire_group(const std::string &global_group_id, + const std::string &instance_id, Context* on_finish) { + mock_acquire_group(global_group_id, on_finish); + } + + void release_group(const std::string &global_group_id, + const std::string &instance_id, Context* on_finish) { + mock_release_group(global_group_id, on_finish); + } + + void remove_group(const std::string &mirror_uuid, + const std::string &global_group_id, + const std::string &instance_id, Context* on_finish) { + mock_remove_group(mirror_uuid, global_group_id, on_finish); + } }; TestMockImageMap() = default; diff --git a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc index 1cc64be6008..1de7fec8f19 100644 --- a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc @@ -5,6 +5,7 @@ #include "test/rbd_mirror/test_mock_fixture.h" #include "test/rbd_mirror/mock/MockContextWQ.h" #include "test/rbd_mirror/mock/MockSafeTimer.h" +#include "tools/rbd_mirror/GroupReplayer.h" #include "tools/rbd_mirror/ImageReplayer.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/InstanceReplayer.h" @@ -59,6 +60,54 @@ template<> struct InstanceWatcher { }; +template<> +struct GroupReplayer { + static GroupReplayer* s_instance; + std::string global_group_id; + + static GroupReplayer *create( + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + const std::string &global_group_id, + Threads *threads, + InstanceWatcher *instance_watcher, + MirrorStatusUpdater* local_status_updater, + journal::CacheManagerHandler *cache_manager_handler, + PoolMetaCache* pool_meta_cache) { + ceph_assert(s_instance != nullptr); + s_instance->global_group_id = global_group_id; + return s_instance; + } + + GroupReplayer() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + virtual ~GroupReplayer() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(destroy, void()); + MOCK_METHOD2(start, void(Context *, bool)); + MOCK_METHOD2(stop, void(Context *, bool)); + MOCK_METHOD1(restart, void(Context*)); + MOCK_METHOD0(flush, void()); + MOCK_METHOD1(print_status, void(Formatter *)); + MOCK_METHOD1(add_peer, void(const Peer& peer)); + MOCK_METHOD0(get_global_group_id, const std::string &()); + MOCK_METHOD0(is_running, bool()); + MOCK_METHOD0(is_stopped, bool()); + MOCK_METHOD0(is_blocklisted, bool()); + + MOCK_CONST_METHOD0(is_finished, bool()); + MOCK_METHOD1(set_finished, void(bool)); + + MOCK_CONST_METHOD0(get_health_state, image_replayer::HealthState()); +}; + +GroupReplayer* GroupReplayer::s_instance = nullptr; + template<> struct ImageReplayer { static ImageReplayer* s_instance; @@ -133,6 +182,7 @@ using ::testing::WithArg; class TestMockInstanceReplayer : public TestMockFixture { public: typedef Threads MockThreads; + typedef GroupReplayer MockGroupReplayer; typedef ImageReplayer MockImageReplayer; typedef InstanceReplayer MockInstanceReplayer; typedef InstanceWatcher MockInstanceWatcher; @@ -191,6 +241,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) { expect_work_queue(mock_threads); Context *timer_ctx = nullptr; expect_add_event_after(mock_threads, &timer_ctx); + Context *group_timer_ctx = nullptr; + expect_add_event_after(mock_threads, &group_timer_ctx); instance_replayer.init(); instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr}); @@ -241,6 +293,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) { instance_replayer.shut_down(); ASSERT_TRUE(timer_ctx != nullptr); delete timer_ctx; + ASSERT_TRUE(group_timer_ctx != nullptr); + delete group_timer_ctx; } TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) { @@ -262,6 +316,8 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) { expect_work_queue(mock_threads); Context *timer_ctx1 = nullptr; expect_add_event_after(mock_threads, &timer_ctx1); + Context *group_timer_ctx = nullptr; + expect_add_event_after(mock_threads, &group_timer_ctx); instance_replayer.init(); instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr}); @@ -316,6 +372,8 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) { instance_replayer.shut_down(); ASSERT_TRUE(timer_ctx2 != nullptr); delete timer_ctx2; + ASSERT_TRUE(group_timer_ctx != nullptr); + delete group_timer_ctx; } TEST_F(TestMockInstanceReplayer, Reacquire) { @@ -337,6 +395,8 @@ TEST_F(TestMockInstanceReplayer, Reacquire) { expect_work_queue(mock_threads); Context *timer_ctx = nullptr; expect_add_event_after(mock_threads, &timer_ctx); + Context *group_timer_ctx = nullptr; + expect_add_event_after(mock_threads, &group_timer_ctx); instance_replayer.init(); instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr}); @@ -376,6 +436,8 @@ TEST_F(TestMockInstanceReplayer, Reacquire) { instance_replayer.shut_down(); ASSERT_TRUE(timer_ctx != nullptr); delete timer_ctx; + ASSERT_TRUE(group_timer_ctx != nullptr); + delete group_timer_ctx; } } // namespace mirror diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc index 1467caa5dda..0b6079d6841 100644 --- a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc @@ -82,6 +82,11 @@ struct InstanceReplayer { MOCK_METHOD2(release_image, void(const std::string &, Context *)); MOCK_METHOD3(remove_peer_image, void(const std::string&, const std::string&, Context *)); + MOCK_METHOD3(acquire_group, void(InstanceWatcher *, + const std::string &, Context *)); + MOCK_METHOD2(release_group, void(const std::string &, Context *)); + MOCK_METHOD3(remove_peer_group, void(const std::string&, const std::string&, + Context *)); }; template <> diff --git a/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc index 95dc6935300..0bcfd591cb8 100644 --- a/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc @@ -159,6 +159,16 @@ struct InstanceWatcher { const std::string&, const std::string&, Context*)); + MOCK_METHOD3(notify_group_acquire, void(const std::string&, + const std::string&, + Context*)); + MOCK_METHOD3(notify_group_release, void(const std::string&, + const std::string&, + Context*)); + MOCK_METHOD4(notify_peer_group_removed, void(const std::string&, + const std::string&, + const std::string&, + Context*)); MOCK_METHOD1(handle_update_leader, void(const std::string&)); diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 193a038d18f..1913f06af6d 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -5,6 +5,7 @@ add_library(rbd_mirror_types STATIC set(rbd_mirror_internal ClusterWatcher.cc + GroupReplayer.cc ImageDeleter.cc ImageMap.cc ImageReplayer.cc diff --git a/src/tools/rbd_mirror/GroupReplayer.cc b/src/tools/rbd_mirror/GroupReplayer.cc new file mode 100644 index 00000000000..be9540ce2d8 --- /dev/null +++ b/src/tools/rbd_mirror/GroupReplayer.cc @@ -0,0 +1,439 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/stringify.h" +#include "common/Formatter.h" +#include "common/admin_socket.h" +#include "common/debug.h" +#include "common/errno.h" +#include "librbd/ImageCtx.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "GroupReplayer.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \ + << __func__ << ": " + +extern PerfCounters *g_perf_counters; + +namespace rbd { +namespace mirror { + +namespace { + +template +class GroupReplayerAdminSocketCommand { +public: + GroupReplayerAdminSocketCommand(const std::string &desc, + GroupReplayer *replayer) + : desc(desc), replayer(replayer) { + } + virtual ~GroupReplayerAdminSocketCommand() {} + virtual int call(Formatter *f) = 0; + + std::string desc; + GroupReplayer *replayer; + bool registered = false; +}; + +template +class StatusCommand : public GroupReplayerAdminSocketCommand { +public: + explicit StatusCommand(const std::string &desc, GroupReplayer *replayer) + : GroupReplayerAdminSocketCommand(desc, replayer) { + } + + int call(Formatter *f) override { + this->replayer->print_status(f); + return 0; + } +}; + +template +class StartCommand : public GroupReplayerAdminSocketCommand { +public: + explicit StartCommand(const std::string &desc, GroupReplayer *replayer) + : GroupReplayerAdminSocketCommand(desc, replayer) { + } + + int call(Formatter *f) override { + this->replayer->start(nullptr, true); + return 0; + } +}; + +template +class StopCommand : public GroupReplayerAdminSocketCommand { +public: + explicit StopCommand(const std::string &desc, GroupReplayer *replayer) + : GroupReplayerAdminSocketCommand(desc, replayer) { + } + + int call(Formatter *f) override { + this->replayer->stop(nullptr, true); + return 0; + } +}; + +template +class RestartCommand : public GroupReplayerAdminSocketCommand { +public: + explicit RestartCommand(const std::string &desc, GroupReplayer *replayer) + : GroupReplayerAdminSocketCommand(desc, replayer) { + } + + int call(Formatter *f) override { + this->replayer->restart(); + return 0; + } +}; + +template +class FlushCommand : public GroupReplayerAdminSocketCommand { +public: + explicit FlushCommand(const std::string &desc, GroupReplayer *replayer) + : GroupReplayerAdminSocketCommand(desc, replayer) { + } + + int call(Formatter *f) override { + this->replayer->flush(); + return 0; + } +}; + +template +class GroupReplayerAdminSocketHook : public AdminSocketHook { +public: + GroupReplayerAdminSocketHook(CephContext *cct, const std::string &name, + GroupReplayer *replayer) + : admin_socket(cct->get_admin_socket()), + commands{{"rbd mirror group flush " + name, + new FlushCommand("flush rbd mirror group " + name, replayer)}, + {"rbd mirror group restart " + name, + new RestartCommand("restart rbd mirror group " + name, replayer)}, + {"rbd mirror group start " + name, + new StartCommand("start rbd mirror group " + name, replayer)}, + {"rbd mirror group status " + name, + new StatusCommand("get status for rbd mirror group " + name, replayer)}, + {"rbd mirror group stop " + name, + new StopCommand("stop rbd mirror group " + name, replayer)}} { + } + + int register_commands() { + for (auto &it : commands) { + int r = admin_socket->register_command(it.first, this, + it.second->desc); + if (r < 0) { + return r; + } + it.second->registered = true; + } + return 0; + } + + ~GroupReplayerAdminSocketHook() override { + admin_socket->unregister_commands(this); + for (auto &it : commands) { + delete it.second; + } + commands.clear(); + } + + int call(std::string_view command, const cmdmap_t& cmdmap, + const bufferlist&, + Formatter *f, + std::ostream& errss, + bufferlist& out) override { + auto i = commands.find(command); + ceph_assert(i != commands.end()); + return i->second->call(f); + } + +private: + typedef std::map*, + std::less<>> Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + +} // anonymous namespace + +template +std::ostream &operator<<(std::ostream &os, + const typename GroupReplayer::State &state) { + switch (state) { + case GroupReplayer::STATE_STARTING: + os << "Starting"; + break; + case GroupReplayer::STATE_REPLAYING: + os << "Replaying"; + break; + case GroupReplayer::STATE_STOPPING: + os << "Stopping"; + break; + case GroupReplayer::STATE_STOPPED: + os << "Stopped"; + break; + default: + os << "Unknown (" << static_cast(state) << ")"; + break; + } + return os; +} + +template +std::ostream &operator<<(std::ostream &os, const GroupReplayer &replayer) { + std::string nspace = replayer.get_namespace(); + if (!nspace.empty()) { + nspace += "/"; + } + + os << "GroupReplayer: " << &replayer << " [" << replayer.get_local_pool_id() << "/" + << nspace << replayer.get_global_group_id() << "]"; + return os; +} + +template +GroupReplayer::GroupReplayer( + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + const std::string &global_group_id, Threads *threads, + InstanceWatcher *instance_watcher, + MirrorStatusUpdater* local_status_updater, + journal::CacheManagerHandler *cache_manager_handler, + PoolMetaCache* pool_meta_cache) : + m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid), + m_global_group_id(global_group_id), m_threads(threads), + m_instance_watcher(instance_watcher), + m_local_status_updater(local_status_updater), + m_cache_manager_handler(cache_manager_handler), + m_pool_meta_cache(pool_meta_cache), + m_local_group_name(global_group_id), + m_lock(ceph::make_mutex("rbd::mirror::GroupReplayer " + + stringify(local_io_ctx.get_id()) + " " + global_group_id)) { + // Register asok commands using a temporary "remote_pool_name/global_group_id" + // name. When the group name becomes known on start the asok commands will be + // re-registered using "remote_pool_name/remote_group_name" name. + + m_group_spec = image_replayer::util::compute_image_spec( + local_io_ctx, global_group_id); + register_admin_socket_hook(); +} + +template +GroupReplayer::~GroupReplayer() { + unregister_admin_socket_hook(); + ceph_assert(m_on_start_finish == nullptr); + ceph_assert(m_on_stop_finish == nullptr); +} + + +template +image_replayer::HealthState GroupReplayer::get_health_state() const { + // TODO: Implement something like m_mirror_image_status_state for group + return image_replayer::HEALTH_STATE_OK; +} + +template +void GroupReplayer::add_peer(const Peer& peer) { + dout(10) << "peer=" << peer << dendl; + + std::lock_guard locker{m_lock}; + auto it = m_peers.find(peer); + if (it == m_peers.end()) { + m_peers.insert(peer); + } +} + +template +void GroupReplayer::set_state_description(int r, const std::string &desc) { + dout(10) << "r=" << r << ", desc=" << desc << dendl; + + std::lock_guard l{m_lock}; + m_last_r = r; + m_state_desc = desc; +} + +template +void GroupReplayer::start(Context *on_finish, bool manual, bool restart) { + dout(10) << "on_finish=" << on_finish << ", manual=" << manual + << ", restart=" << restart << dendl; + + int r = 0; + { + std::lock_guard locker{m_lock}; + if (!is_stopped_()) { + derr << "already running" << dendl; + r = -EINVAL; + } else if (m_manual_stop && !manual) { + dout(5) << "stopped manually, ignoring start without manual flag" + << dendl; + r = -EPERM; + } else if (restart && !m_restart_requested) { + dout(10) << "canceled restart" << dendl; + r = -ECANCELED; + } else { + m_state = STATE_STARTING; + m_last_r = 0; + m_state_desc.clear(); + m_manual_stop = false; + } + } + + if (r < 0) { + if (on_finish) { + on_finish->complete(r); + } + return; + } + + // TODO + on_finish->complete(0); +} + +template +void GroupReplayer::stop(Context *on_finish, bool manual, bool restart) { + dout(10) << "on_finish=" << on_finish << ", manual=" << manual + << ", restart=" << restart << dendl; + + bool running = true; + { + std::lock_guard locker{m_lock}; + + if (restart) { + m_restart_requested = true; + } + + if (!is_running_()) { + running = false; + if (!restart && m_restart_requested) { + dout(10) << "canceling restart" << dendl; + m_restart_requested = false; + } + } else { + if (!is_stopped_()) { + if (m_state == STATE_STARTING) { + dout(10) << "canceling start" << dendl; + } else { + dout(10) << "interrupting replay" << dendl; + } + } + } + } + + if (!running) { + dout(20) << "not running" << dendl; + if (on_finish) { + on_finish->complete(-EINVAL); + } + return; + } + + // TODO + on_finish->complete(0); +} + +template +void GroupReplayer::restart(Context *on_finish) { + { + std::lock_guard locker{m_lock}; + m_restart_requested = true; + } + + auto ctx = new LambdaContext( + [this, on_finish](int r) { + if (r < 0) { + // Try start anyway. + } + start(on_finish, true, true); + }); + stop(ctx, false, true); +} + +template +void GroupReplayer::flush() { + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + return; + } + + dout(10) << dendl; + // TODO +} + +template +void GroupReplayer::print_status(Formatter *f) { + dout(10) << dendl; + + std::lock_guard l{m_lock}; + + f->open_object_section("group_replayer"); + f->dump_string("name", m_group_spec); + f->dump_stream("state") << m_state; + f->close_section(); +} + +template +void GroupReplayer::register_admin_socket_hook() { + GroupReplayerAdminSocketHook *asok_hook; + { + std::lock_guard locker{m_lock}; + if (m_asok_hook != nullptr) { + return; + } + + dout(15) << "registered asok hook: " << m_group_spec << dendl; + asok_hook = new GroupReplayerAdminSocketHook( + g_ceph_context, m_group_spec, this); + int r = asok_hook->register_commands(); + if (r == 0) { + m_asok_hook = asok_hook; + return; + } + derr << "error registering admin socket commands" << dendl; + } + delete asok_hook; +} + +template +void GroupReplayer::unregister_admin_socket_hook() { + dout(15) << dendl; + + AdminSocketHook *asok_hook = nullptr; + { + std::lock_guard locker{m_lock}; + std::swap(asok_hook, m_asok_hook); + } + delete asok_hook; +} + +template +void GroupReplayer::reregister_admin_socket_hook() { + std::unique_lock locker{m_lock}; + + auto group_spec = image_replayer::util::compute_image_spec( + m_local_io_ctx, m_local_group_name); + if (m_asok_hook != nullptr && m_group_spec == group_spec) { + return; + } + + dout(15) << "old_group_spec=" << m_group_spec << ", " + << "new_group_spec=" << group_spec << dendl; + m_group_spec = group_spec; + + if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) { + // no need to re-register if stopping + return; + } + locker.unlock(); + + unregister_admin_socket_hook(); + register_admin_socket_hook(); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::GroupReplayer; diff --git a/src/tools/rbd_mirror/GroupReplayer.h b/src/tools/rbd_mirror/GroupReplayer.h new file mode 100644 index 00000000000..07d9802e946 --- /dev/null +++ b/src/tools/rbd_mirror/GroupReplayer.h @@ -0,0 +1,181 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_GROUP_REPLAYER_H +#define CEPH_RBD_MIRROR_GROUP_REPLAYER_H + +#include "common/ceph_mutex.h" +#include "include/rados/librados.hpp" +#include "tools/rbd_mirror/Types.h" +#include "tools/rbd_mirror/image_replayer/Types.h" +#include +#include + +class AdminSocketHook; + +namespace journal { struct CacheManagerHandler; } +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +template struct InstanceWatcher; +template struct MirrorStatusUpdater; +struct PoolMetaCache; +template struct Threads; + +/** + * Replays changes from a remote cluster for a single group. + */ +template +class GroupReplayer { +public: + static GroupReplayer *create( + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + const std::string &global_group_id, Threads *threads, + InstanceWatcher *instance_watcher, + MirrorStatusUpdater* local_status_updater, + journal::CacheManagerHandler *cache_manager_handler, + PoolMetaCache* pool_meta_cache) { + return new GroupReplayer(local_io_ctx, local_mirror_uuid, global_group_id, + threads, instance_watcher, local_status_updater, + cache_manager_handler, pool_meta_cache); + } + void destroy() { + delete this; + } + + GroupReplayer(librados::IoCtx &local_io_ctx, + const std::string &local_mirror_uuid, + const std::string &global_group_id, + Threads *threads, + InstanceWatcher *instance_watcher, + MirrorStatusUpdater* local_status_updater, + journal::CacheManagerHandler *cache_manager_handler, + PoolMetaCache* pool_meta_cache); + virtual ~GroupReplayer(); + GroupReplayer(const GroupReplayer&) = delete; + GroupReplayer& operator=(const GroupReplayer&) = delete; + + bool is_stopped() { std::lock_guard l{m_lock}; return is_stopped_(); } + bool is_running() { std::lock_guard l{m_lock}; return is_running_(); } + bool is_replaying() { std::lock_guard l{m_lock}; return is_replaying_(); } + + std::string get_name() { std::lock_guard l{m_lock}; return m_group_spec; }; + void set_state_description(int r, const std::string &desc); + + // TODO temporary until policy handles release of group replayers + inline bool is_finished() const { + std::lock_guard locker{m_lock}; + return m_finished; + } + inline void set_finished(bool finished) { + std::lock_guard locker{m_lock}; + m_finished = finished; + } + + inline bool is_blocklisted() const { + std::lock_guard locker{m_lock}; + return (m_last_r == -EBLOCKLISTED); + } + + image_replayer::HealthState get_health_state() const; + + void add_peer(const Peer& peer); + + inline int64_t get_local_pool_id() const { + return m_local_io_ctx.get_id(); + } + inline std::string get_namespace() const { + return m_local_io_ctx.get_namespace(); + } + inline const std::string& get_global_group_id() const { + return m_global_group_id; + } + + void start(Context *on_finish = nullptr, bool manual = false, + bool restart = false); + void stop(Context *on_finish = nullptr, bool manual = false, + bool restart = false); + void restart(Context *on_finish = nullptr); + void flush(); + + void print_status(Formatter *f); + + template + friend std::ostream &operator<<(std::ostream &os, + const GroupReplayer &replayer); + + /** + * @verbatim + * (error) + * <------------------------------------ FAIL + * | ^ + * v * + * * + * | * + * v (error) * + * + * + * @endverbatim + */ + +private: + typedef std::set> Peers; + + enum State { + STATE_UNKNOWN, + STATE_STARTING, + STATE_REPLAYING, + STATE_STOPPING, + STATE_STOPPED, + }; + + librados::IoCtx &m_local_io_ctx; + std::string m_local_mirror_uuid; + std::string m_global_group_id; + Threads *m_threads; + InstanceWatcher *m_instance_watcher; + MirrorStatusUpdater* m_local_status_updater; + journal::CacheManagerHandler *m_cache_manager_handler; + PoolMetaCache* m_pool_meta_cache; + + std::string m_local_group_name; + std::string m_group_spec; + Peers m_peers; + + mutable ceph::mutex m_lock; + State m_state = STATE_STOPPED; + std::string m_state_desc; + int m_last_r = 0; + + Context *m_on_start_finish = nullptr; + Context *m_on_stop_finish = nullptr; + bool m_stop_requested = false; + bool m_restart_requested = false; + bool m_manual_stop = false; + bool m_finished = false; + + AdminSocketHook *m_asok_hook = nullptr; + + bool is_stopped_() const { + return m_state == STATE_STOPPED; + } + bool is_running_() const { + return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested; + } + bool is_replaying_() const { + return (m_state == STATE_REPLAYING); + } + + void register_admin_socket_hook(); + void unregister_admin_socket_hook(); + void reregister_admin_socket_hook(); +}; + +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::GroupReplayer; + +#endif // CEPH_RBD_MIRROR_GROUP_REPLAYER_H diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc index f278ab09bc1..11c73e9f1ab 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.cc +++ b/src/tools/rbd_mirror/InstanceReplayer.cc @@ -8,6 +8,7 @@ #include "common/errno.h" #include "librbd/Utils.h" #include "librbd/asio/ContextWQ.h" +#include "GroupReplayer.h" #include "ImageReplayer.h" #include "InstanceReplayer.h" #include "ServiceDaemon.h" @@ -24,9 +25,13 @@ namespace mirror { namespace { -const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); -const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); -const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); +const std::string SERVICE_DAEMON_IMAGE_ASSIGNED_COUNT_KEY("image_assigned_count"); +const std::string SERVICE_DAEMON_IMAGE_WARNING_COUNT_KEY("image_warning_count"); +const std::string SERVICE_DAEMON_IMAGE_ERROR_COUNT_KEY("image_error_count"); + +const std::string SERVICE_DAEMON_GROUP_ASSIGNED_COUNT_KEY("group_assigned_count"); +const std::string SERVICE_DAEMON_GROUP_WARNING_COUNT_KEY("group_warning_count"); +const std::string SERVICE_DAEMON_GROUP_ERROR_COUNT_KEY("group_error_count"); } // anonymous namespace @@ -78,6 +83,7 @@ void InstanceReplayer::init(Context *on_finish) { { std::lock_guard timer_locker{m_threads->timer_lock}; schedule_image_state_check_task(); + schedule_group_state_check_task(); } on_finish->complete(0); }); @@ -127,6 +133,17 @@ void InstanceReplayer::release_all(Context *on_finish) { std::lock_guard locker{m_lock}; C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); + for (auto it = m_group_replayers.begin(); it != m_group_replayers.end(); + it = m_group_replayers.erase(it)) { + auto group_replayer = it->second; + auto ctx = gather_ctx->new_sub(); + ctx = new LambdaContext( + [group_replayer, ctx] (int r) { + group_replayer->destroy(); + ctx->complete(0); + }); + stop_group_replayer(group_replayer, ctx); + } for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); it = m_image_replayers.erase(it)) { auto image_replayer = it->second; @@ -229,15 +246,107 @@ void InstanceReplayer::remove_peer_image(const std::string &global_image_id, m_threads->work_queue->queue(on_finish, 0); } +template +void InstanceReplayer::acquire_group(InstanceWatcher *instance_watcher, + const std::string &global_group_id, + Context *on_finish) { + dout(10) << "global_group_id=" << global_group_id << dendl; + + std::lock_guard locker{m_lock}; + + ceph_assert(m_on_shut_down == nullptr); + + auto it = m_group_replayers.find(global_group_id); + if (it == m_group_replayers.end()) { + auto group_replayer = GroupReplayer::create( + m_local_io_ctx, m_local_mirror_uuid, global_group_id, + m_threads, instance_watcher, m_local_status_updater, + m_cache_manager_handler, m_pool_meta_cache); + + dout(10) << global_group_id << ": creating replayer " << group_replayer + << dendl; + + it = m_group_replayers.insert(std::make_pair(global_group_id, + group_replayer)).first; + + // TODO only a single peer is currently supported + ceph_assert(m_peers.size() == 1); + auto peer = *m_peers.begin(); + group_replayer->add_peer(peer); + start_group_replayer(group_replayer); + } else { + // A duplicate acquire notification implies (1) connection hiccup or + // (2) new leader election. For the second case, restart the replayer to + // detect if the group has been deleted while the leader was offline + auto& group_replayer = it->second; + group_replayer->set_finished(false); + group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); + } + + m_threads->work_queue->queue(on_finish, 0); +} + +template +void InstanceReplayer::release_group(const std::string &global_group_id, + Context *on_finish) { + dout(10) << "global_group_id=" << global_group_id << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_on_shut_down == nullptr); + + auto it = m_group_replayers.find(global_group_id); + if (it == m_group_replayers.end()) { + dout(5) << global_group_id << ": not found" << dendl; + m_threads->work_queue->queue(on_finish, 0); + return; + } + + auto group_replayer = it->second; + m_group_replayers.erase(it); + + on_finish = new LambdaContext( + [group_replayer, on_finish] (int r) { + group_replayer->destroy(); + on_finish->complete(0); + }); + stop_group_replayer(group_replayer, on_finish); +} + +template +void InstanceReplayer::remove_peer_group(const std::string &global_group_id, + const std::string &peer_mirror_uuid, + Context *on_finish) { + dout(10) << "global_group_id=" << global_group_id << ", " + << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_on_shut_down == nullptr); + + auto it = m_group_replayers.find(global_group_id); + if (it != m_group_replayers.end()) { + // TODO only a single peer is currently supported, therefore + // we can just interrupt the current group replayer and + // it will eventually detect that the peer group is missing and + // determine if a delete propagation is required. + auto group_replayer = it->second; + group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); + } + m_threads->work_queue->queue(on_finish, 0); +} + template void InstanceReplayer::print_status(Formatter *f) { dout(10) << dendl; std::lock_guard locker{m_lock}; + f->open_array_section("group_replayers"); + for (auto &[_, group_replayer] : m_group_replayers) { + group_replayer->print_status(f); + } + f->close_section(); f->open_array_section("image_replayers"); - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; + for (auto &[_, image_replayer] : m_image_replayers) { image_replayer->print_status(f); } f->close_section(); @@ -255,8 +364,10 @@ void InstanceReplayer::start() auto cct = static_cast(m_local_io_ctx.cct()); auto gather_ctx = new C_Gather( cct, new C_TrackedOp(m_async_op_tracker, nullptr)); - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; + for (auto &[_, group_replayer] : m_group_replayers) { + group_replayer->start(gather_ctx->new_sub(), true); + } + for (auto &[_, image_replayer] : m_image_replayers) { image_replayer->start(gather_ctx->new_sub(), true); } @@ -290,8 +401,10 @@ void InstanceReplayer::stop(Context *on_finish) m_manual_stop = true; - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; + for (auto &[_, group_replayer] : m_group_replayers) { + group_replayer->stop(gather_ctx->new_sub(), true); + } + for (auto &[_, image_replayer] : m_image_replayers) { image_replayer->stop(gather_ctx->new_sub(), true); } } @@ -308,8 +421,10 @@ void InstanceReplayer::restart() m_manual_stop = false; - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; + for (auto &[_, group_replayer] : m_group_replayers) { + group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); + } + for (auto &[_, image_replayer] : m_image_replayers) { image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); } } @@ -321,8 +436,10 @@ void InstanceReplayer::flush() std::lock_guard locker{m_lock}; - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; + for (auto &[_, group_replayer] : m_group_replayers) { + group_replayer->flush(); + } + for (auto &[_, image_replayer] : m_image_replayers) { image_replayer->flush(); } } @@ -397,13 +514,13 @@ void InstanceReplayer::start_image_replayers( m_service_daemon->add_or_update_namespace_attribute( m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), - SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); + SERVICE_DAEMON_IMAGE_ASSIGNED_COUNT_KEY, image_count); m_service_daemon->add_or_update_namespace_attribute( m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), - SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); + SERVICE_DAEMON_IMAGE_WARNING_COUNT_KEY, warning_count); m_service_daemon->add_or_update_namespace_attribute( m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), - SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); + SERVICE_DAEMON_IMAGE_ERROR_COUNT_KEY, error_count); } template @@ -451,26 +568,6 @@ void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, } } -template -void InstanceReplayer::wait_for_ops() { - dout(10) << dendl; - - Context *ctx = create_context_callback< - InstanceReplayer, &InstanceReplayer::handle_wait_for_ops>(this); - - m_async_op_tracker.wait_for_ops(ctx); -} - -template -void InstanceReplayer::handle_wait_for_ops(int r) { - dout(10) << "r=" << r << dendl; - - ceph_assert(r == 0); - - std::lock_guard locker{m_lock}; - stop_image_replayers(); -} - template void InstanceReplayer::stop_image_replayers() { dout(10) << dendl; @@ -546,6 +643,215 @@ void InstanceReplayer::schedule_image_state_check_task() { m_threads->timer->add_event_after(after, m_image_state_check_task); } +template +void InstanceReplayer::start_group_replayer( + GroupReplayer *group_replayer) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + std::string global_group_id = group_replayer->get_global_group_id(); + if (!group_replayer->is_stopped()) { + return; + } else if (group_replayer->is_blocklisted()) { + derr << "global_group_id=" << global_group_id << ": blocklisted detected " + << "during group replay" << dendl; + m_blocklisted = true; + return; + } else if (group_replayer->is_finished()) { + // TODO temporary until policy integrated + dout(5) << "removing group replayer for global_group_id=" + << global_group_id << dendl; + m_group_replayers.erase(group_replayer->get_global_group_id()); + group_replayer->destroy(); + return; + } else if (m_manual_stop) { + return; + } + + dout(10) << "global_group_id=" << global_group_id << dendl; + group_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false); +} + +template +void InstanceReplayer::queue_start_group_replayers() { + dout(10) << dendl; + + Context *ctx = create_context_callback< + InstanceReplayer, &InstanceReplayer::start_group_replayers>(this); + m_async_op_tracker.start_op(); + m_threads->work_queue->queue(ctx, 0); +} + +template +void InstanceReplayer::start_group_replayers(int r) { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + if (m_on_shut_down != nullptr) { + return; + } + + uint64_t group_count = 0; + uint64_t warning_count = 0; + uint64_t error_count = 0; + for (auto it = m_group_replayers.begin(); + it != m_group_replayers.end();) { + auto current_it(it); + ++it; + + ++group_count; + auto health_state = current_it->second->get_health_state(); + if (health_state == image_replayer::HEALTH_STATE_WARNING) { + ++warning_count; + } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { + ++error_count; + } + + start_group_replayer(current_it->second); + } + + m_service_daemon->add_or_update_namespace_attribute( + m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), + SERVICE_DAEMON_GROUP_ASSIGNED_COUNT_KEY, group_count); + m_service_daemon->add_or_update_namespace_attribute( + m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), + SERVICE_DAEMON_GROUP_WARNING_COUNT_KEY, warning_count); + m_service_daemon->add_or_update_namespace_attribute( + m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), + SERVICE_DAEMON_GROUP_ERROR_COUNT_KEY, error_count); + + m_async_op_tracker.finish_op(); +} + +template +void InstanceReplayer::stop_group_replayer(GroupReplayer *group_replayer, + Context *on_finish) { + dout(10) << group_replayer << " global_group_id=" + << group_replayer->get_global_group_id() << ", on_finish=" + << on_finish << dendl; + + if (group_replayer->is_stopped()) { + m_threads->work_queue->queue(on_finish, 0); + return; + } + + m_async_op_tracker.start_op(); + Context *ctx = create_async_context_callback( + m_threads->work_queue, new LambdaContext( + [this, group_replayer, on_finish] (int r) { + stop_group_replayer(group_replayer, on_finish); + m_async_op_tracker.finish_op(); + })); + + if (group_replayer->is_running()) { + group_replayer->stop(ctx, false); + } else { + int after = 1; + dout(10) << "scheduling group replayer " << group_replayer << " stop after " + << after << " sec (task " << ctx << ")" << dendl; + ctx = new LambdaContext( + [this, after, ctx] (int r) { + std::lock_guard timer_locker{m_threads->timer_lock}; + m_threads->timer->add_event_after(after, ctx); + }); + m_threads->work_queue->queue(ctx, 0); + } +} + +template +void InstanceReplayer::stop_group_replayers() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback, + &InstanceReplayer::handle_stop_group_replayers>(this)); + + C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); + for (auto &[_, group_replayer] : m_group_replayers) { + stop_group_replayer(group_replayer, gather_ctx->new_sub()); + } + gather_ctx->activate(); +} + +template +void InstanceReplayer::handle_stop_group_replayers(int r) { + dout(10) << "r=" << r << dendl; + + ceph_assert(r == 0); + + Context *on_finish = nullptr; + { + std::lock_guard locker{m_lock}; + + for (auto &[_, group_replayer] : m_group_replayers) { + ceph_assert(group_replayer->is_stopped()); + group_replayer->destroy(); + } + m_group_replayers.clear(); + + ceph_assert(m_on_shut_down != nullptr); + std::swap(on_finish, m_on_shut_down); + } + on_finish->complete(r); +} + +template +void InstanceReplayer::cancel_group_state_check_task() { + std::lock_guard timer_locker{m_threads->timer_lock}; + + if (m_group_state_check_task == nullptr) { + return; + } + + dout(10) << m_group_state_check_task << dendl; + bool canceled = m_threads->timer->cancel_event(m_group_state_check_task); + ceph_assert(canceled); + m_group_state_check_task = nullptr; +} + +template +void InstanceReplayer::schedule_group_state_check_task() { + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(m_group_state_check_task == nullptr); + + m_group_state_check_task = new LambdaContext( + [this](int r) { + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + m_group_state_check_task = nullptr; + schedule_group_state_check_task(); + queue_start_group_replayers(); + }); + + auto cct = static_cast(m_local_io_ctx.cct()); + int after = cct->_conf.get_val( + "rbd_mirror_image_state_check_interval"); + + dout(10) << "scheduling group state check after " << after << " sec (task " + << m_group_state_check_task << ")" << dendl; + m_threads->timer->add_event_after(after, m_group_state_check_task); +} + +template +void InstanceReplayer::wait_for_ops() { + dout(10) << dendl; + + Context *ctx = create_context_callback< + InstanceReplayer, &InstanceReplayer::handle_wait_for_ops>(this); + + m_async_op_tracker.wait_for_ops(ctx); +} + +template +void InstanceReplayer::handle_wait_for_ops(int r) { + dout(10) << "r=" << r << dendl; + + ceph_assert(r == 0); + + std::lock_guard locker{m_lock}; + stop_image_replayers(); +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/InstanceReplayer.h b/src/tools/rbd_mirror/InstanceReplayer.h index 5399c6994a5..f83a2ba688f 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.h +++ b/src/tools/rbd_mirror/InstanceReplayer.h @@ -19,6 +19,7 @@ namespace librbd { class ImageCtx; } namespace rbd { namespace mirror { +template class GroupReplayer; template class ImageReplayer; template class InstanceWatcher; template class MirrorStatusUpdater; @@ -69,6 +70,13 @@ public: const std::string &peer_mirror_uuid, Context *on_finish); + void acquire_group(InstanceWatcher *instance_watcher, + const std::string &global_group_id, Context *on_finish); + void release_group(const std::string &global_group_id, Context *on_finish); + void remove_peer_group(const std::string &global_group_id, + const std::string &peer_mirror_uuid, + Context *on_finish); + void release_all(Context *on_finish); void print_status(Formatter *f); @@ -85,8 +93,13 @@ private: * * <-------------------\ * | (init) | (repeat for each - * v STOP_IMAGE_REPLAYER ---\ image replayer) - * SCHEDULE_IMAGE_STATE_CHECK_TASK ^ ^ | + * v STOP_GROUP_REPLAYER ---\ group replayer) + * SCHEDULE_GROUP_STATE_CHECK_TASK ^ ^ | + * | | | | + * v | \---------/ + * SCHEDULE_IMAGE_STATE_CHECK_TASK | (repeat for each + * | STOP_IMAGE_REPLAYER ---\ image replayer) + * | ^ ^ | * | | | | * v (shut_down) | \---------/ * -----------------> WAIT_FOR_OPS @@ -107,8 +120,10 @@ private: mutable ceph::mutex m_lock; AsyncOpTracker m_async_op_tracker; std::map *> m_image_replayers; + std::map *> m_group_replayers; Peers m_peers; Context *m_image_state_check_task = nullptr; + Context *m_group_state_check_task = nullptr; Context *m_on_shut_down = nullptr; bool m_manual_stop = false; bool m_blocklisted = false; @@ -129,6 +144,19 @@ private: void schedule_image_state_check_task(); void cancel_image_state_check_task(); + + void start_group_replayer(GroupReplayer *group_replayer); + void queue_start_group_replayers(); + void start_group_replayers(int r); + + void stop_group_replayer(GroupReplayer *group_replayer, + Context *on_finish); + + void stop_group_replayers(); + void handle_stop_group_replayers(int r); + + void schedule_group_state_check_task(); + void cancel_group_state_check_task(); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index 2246442d5b3..e0daac36e8c 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -461,6 +461,64 @@ void InstanceWatcher::notify_peer_image_removed( req->send(); } +template +void InstanceWatcher::notify_group_acquire( + const std::string &instance_id, const std::string &global_group_id, + Context *on_notify_ack) { + dout(10) << "instance_id=" << instance_id << ", global_group_id=" + << global_group_id << dendl; + + std::lock_guard locker{m_lock}; + + ceph_assert(m_on_finish == nullptr); + + uint64_t request_id = ++m_request_seq; + bufferlist bl; + encode(NotifyMessage{GroupAcquirePayload{request_id, global_group_id}}, bl); + auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, + std::move(bl), on_notify_ack); + req->send(); +} + +template +void InstanceWatcher::notify_group_release( + const std::string &instance_id, const std::string &global_group_id, + Context *on_notify_ack) { + dout(10) << "instance_id=" << instance_id << ", global_group_id=" + << global_group_id << dendl; + + std::lock_guard locker{m_lock}; + + ceph_assert(m_on_finish == nullptr); + + uint64_t request_id = ++m_request_seq; + bufferlist bl; + encode(NotifyMessage{GroupReleasePayload{request_id, global_group_id}}, bl); + auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, + std::move(bl), on_notify_ack); + req->send(); +} + +template +void InstanceWatcher::notify_peer_group_removed( + const std::string &instance_id, const std::string &global_group_id, + const std::string &peer_mirror_uuid, Context *on_notify_ack) { + dout(10) << "instance_id=" << instance_id << ", " + << "global_group_id=" << global_group_id << ", " + << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_on_finish == nullptr); + + uint64_t request_id = ++m_request_seq; + bufferlist bl; + encode(NotifyMessage{PeerGroupRemovedPayload{request_id, global_group_id, + peer_mirror_uuid}}, bl); + auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, + std::move(bl), on_notify_ack); + req->send(); +} + template void InstanceWatcher::notify_sync_request(const std::string &sync_id, Context *on_sync_start) { @@ -1145,6 +1203,54 @@ void InstanceWatcher::handle_peer_image_removed( m_work_queue->queue(ctx, 0); } +template +void InstanceWatcher::handle_group_acquire( + const std::string &global_group_id, Context *on_finish) { + dout(10) << "global_group_id=" << global_group_id << dendl; + + auto ctx = new LambdaContext( + [this, global_group_id, on_finish] (int r) { + m_instance_replayer->acquire_group(this, global_group_id, on_finish); + m_notify_op_tracker.finish_op(); + }); + + m_notify_op_tracker.start_op(); + m_work_queue->queue(ctx, 0); +} + +template +void InstanceWatcher::handle_group_release( + const std::string &global_group_id, Context *on_finish) { + dout(10) << "global_group_id=" << global_group_id << dendl; + + auto ctx = new LambdaContext( + [this, global_group_id, on_finish] (int r) { + m_instance_replayer->release_group(global_group_id, on_finish); + m_notify_op_tracker.finish_op(); + }); + + m_notify_op_tracker.start_op(); + m_work_queue->queue(ctx, 0); +} + +template +void InstanceWatcher::handle_peer_group_removed( + const std::string &global_group_id, const std::string &peer_mirror_uuid, + Context *on_finish) { + dout(10) << "global_group_id=" << global_group_id << ", " + << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; + + auto ctx = new LambdaContext( + [this, peer_mirror_uuid, global_group_id, on_finish] (int r) { + m_instance_replayer->remove_peer_group(global_group_id, + peer_mirror_uuid, on_finish); + m_notify_op_tracker.finish_op(); + }); + + m_notify_op_tracker.start_op(); + m_work_queue->queue(ctx, 0); +} + template void InstanceWatcher::handle_sync_request(const std::string &instance_id, const std::string &sync_id, @@ -1243,6 +1349,49 @@ void InstanceWatcher::handle_payload(const std::string &instance_id, } } +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const GroupAcquirePayload &payload, + C_NotifyAck *on_notify_ack) { + dout(10) << "group_acquire: instance_id=" << instance_id << ", " + << "request_id=" << payload.request_id << dendl; + + auto on_finish = prepare_request(instance_id, payload.request_id, + on_notify_ack); + if (on_finish != nullptr) { + handle_group_acquire(payload.global_group_id, on_finish); + } +} + +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const GroupReleasePayload &payload, + C_NotifyAck *on_notify_ack) { + dout(10) << "group_release: instance_id=" << instance_id << ", " + << "request_id=" << payload.request_id << dendl; + + auto on_finish = prepare_request(instance_id, payload.request_id, + on_notify_ack); + if (on_finish != nullptr) { + handle_group_release(payload.global_group_id, on_finish); + } +} + +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const PeerGroupRemovedPayload &payload, + C_NotifyAck *on_notify_ack) { + dout(10) << "remove_peer_group: instance_id=" << instance_id << ", " + << "request_id=" << payload.request_id << dendl; + + auto on_finish = prepare_request(instance_id, payload.request_id, + on_notify_ack); + if (on_finish != nullptr) { + handle_peer_group_removed(payload.global_group_id, payload.peer_mirror_uuid, + on_finish); + } +} + template void InstanceWatcher::handle_payload(const std::string &instance_id, const SyncRequestPayload &payload, diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h index 08e40b40bf1..c35b7fe3d49 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.h +++ b/src/tools/rbd_mirror/InstanceWatcher.h @@ -78,6 +78,17 @@ public: const std::string &peer_mirror_uuid, Context *on_notify_ack); + void notify_group_acquire(const std::string &instance_id, + const std::string &global_group_id, + Context *on_notify_ack); + void notify_group_release(const std::string &instance_id, + const std::string &global_group_id, + Context *on_notify_ack); + void notify_peer_group_removed(const std::string &instance_id, + const std::string &global_group_id, + const std::string &peer_mirror_uuid, + Context *on_notify_ack); + void notify_sync_request(const std::string &sync_id, Context *on_sync_start); bool cancel_sync_request(const std::string &sync_id); void notify_sync_complete(const std::string &sync_id); @@ -238,6 +249,14 @@ private: const std::string &peer_mirror_uuid, Context *on_finish); + void handle_group_acquire(const std::string &global_group_id, + Context *on_finish); + void handle_group_release(const std::string &global_group_id, + Context *on_finish); + void handle_peer_group_removed(const std::string &global_group_id, + const std::string &peer_mirror_uuid, + Context *on_finish); + void handle_sync_request(const std::string &instance_id, const std::string &sync_id, Context *on_finish); void handle_sync_start(const std::string &instance_id, @@ -252,6 +271,15 @@ private: void handle_payload(const std::string &instance_id, const instance_watcher::PeerImageRemovedPayload &payload, C_NotifyAck *on_notify_ack); + void handle_payload(const std::string &instance_id, + const instance_watcher::GroupAcquirePayload &payload, + C_NotifyAck *on_notify_ack); + void handle_payload(const std::string &instance_id, + const instance_watcher::GroupReleasePayload &payload, + C_NotifyAck *on_notify_ack); + void handle_payload(const std::string &instance_id, + const instance_watcher::PeerGroupRemovedPayload &payload, + C_NotifyAck *on_notify_ack); void handle_payload(const std::string &instance_id, const instance_watcher::SyncRequestPayload &payload, C_NotifyAck *on_notify_ack); diff --git a/src/tools/rbd_mirror/NamespaceReplayer.cc b/src/tools/rbd_mirror/NamespaceReplayer.cc index 3cdfa92b0da..545ee4d620b 100644 --- a/src/tools/rbd_mirror/NamespaceReplayer.cc +++ b/src/tools/rbd_mirror/NamespaceReplayer.cc @@ -822,8 +822,8 @@ void NamespaceReplayer::handle_shut_down_image_map(int r, Context *on_finish) template void NamespaceReplayer::handle_acquire_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) { + const std::string &instance_id, + Context* on_finish) { dout(5) << "global_image_id=" << global_image_id << ", " << "instance_id=" << instance_id << dendl; @@ -833,8 +833,8 @@ void NamespaceReplayer::handle_acquire_image(const std::string &global_image_ template void NamespaceReplayer::handle_release_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) { + const std::string &instance_id, + Context* on_finish) { dout(5) << "global_image_id=" << global_image_id << ", " << "instance_id=" << instance_id << dendl; @@ -844,9 +844,9 @@ void NamespaceReplayer::handle_release_image(const std::string &global_image_ template void NamespaceReplayer::handle_remove_image(const std::string &mirror_uuid, - const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) { + const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish) { ceph_assert(!mirror_uuid.empty()); dout(5) << "mirror_uuid=" << mirror_uuid << ", " << "global_image_id=" << global_image_id << ", " @@ -856,6 +856,42 @@ void NamespaceReplayer::handle_remove_image(const std::string &mirror_uuid, mirror_uuid, on_finish); } +template +void NamespaceReplayer::handle_acquire_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) { + dout(5) << "global_group_id=" << global_group_id << ", " + << "instance_id=" << instance_id << dendl; + + m_instance_watcher->notify_group_acquire(instance_id, global_group_id, + on_finish); +} + +template +void NamespaceReplayer::handle_release_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) { + dout(5) << "global_group_id=" << global_group_id << ", " + << "instance_id=" << instance_id << dendl; + + m_instance_watcher->notify_group_release(instance_id, global_group_id, + on_finish); +} + +template +void NamespaceReplayer::handle_remove_group(const std::string &mirror_uuid, + const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) { + ceph_assert(!mirror_uuid.empty()); + dout(5) << "mirror_uuid=" << mirror_uuid << ", " + << "global_group_id=" << global_group_id << ", " + << "instance_id=" << instance_id << dendl; + + m_instance_watcher->notify_peer_group_removed(instance_id, global_group_id, + mirror_uuid, on_finish); +} + template std::string NamespaceReplayer::get_local_namespace() { return m_local_namespace_name; diff --git a/src/tools/rbd_mirror/NamespaceReplayer.h b/src/tools/rbd_mirror/NamespaceReplayer.h index 262ffb0cd79..152761ed6da 100644 --- a/src/tools/rbd_mirror/NamespaceReplayer.h +++ b/src/tools/rbd_mirror/NamespaceReplayer.h @@ -194,7 +194,29 @@ private: const std::string &instance_id, Context* on_finish) override { namespace_replayer->handle_remove_image(mirror_uuid, global_image_id, - instance_id, on_finish); + instance_id, on_finish); + } + + void acquire_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) override { + namespace_replayer->handle_acquire_group(global_group_id, instance_id, + on_finish); + } + + void release_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) override { + namespace_replayer->handle_release_group(global_group_id, instance_id, + on_finish); + } + + void remove_group(const std::string &mirror_uuid, + const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) override { + namespace_replayer->handle_remove_group(mirror_uuid, global_group_id, + instance_id, on_finish); } }; @@ -268,6 +290,16 @@ private: const std::string &global_image_id, const std::string &instance_id, Context* on_finish); + void handle_acquire_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish); + void handle_release_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish); + void handle_remove_group(const std::string &mirror_uuid, + const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish); std::string m_local_namespace_name; std::string m_remote_namespace_name; diff --git a/src/tools/rbd_mirror/PoolWatcher.cc b/src/tools/rbd_mirror/PoolWatcher.cc index b16792019e4..0c8bab188fe 100644 --- a/src/tools/rbd_mirror/PoolWatcher.cc +++ b/src/tools/rbd_mirror/PoolWatcher.cc @@ -382,7 +382,18 @@ void PoolWatcher::handle_group_updated(const std::string &id, << "image_count=" << image_count << ", " << "enabled=" << enabled << dendl; - // TODO + std::lock_guard locker{m_lock}; + MirrorEntity entity(MIRROR_ENTITY_TYPE_GROUP, global_group_id, image_count); + m_pending_added_entities.erase(entity); + m_pending_removed_entities.erase(entity); + + if (enabled) { + m_pending_added_entities.insert({entity, id}); + schedule_listener(); + } else { + m_pending_removed_entities.insert({entity, id}); + schedule_listener(); + } } template diff --git a/src/tools/rbd_mirror/image_map/Types.h b/src/tools/rbd_mirror/image_map/Types.h index b68a4674786..5ce4c33e961 100644 --- a/src/tools/rbd_mirror/image_map/Types.h +++ b/src/tools/rbd_mirror/image_map/Types.h @@ -41,6 +41,16 @@ struct Listener { const std::string &global_image_id, const std::string &instance_id, Context* on_finish) = 0; + virtual void acquire_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) = 0; + virtual void release_group(const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) = 0; + virtual void remove_group(const std::string &mirror_uuid, + const std::string &global_group_id, + const std::string &instance_id, + Context* on_finish) = 0; }; struct LookupInfo { diff --git a/src/tools/rbd_mirror/instance_watcher/Types.cc b/src/tools/rbd_mirror/instance_watcher/Types.cc index 4c9f5235f19..25a9f2bb6b9 100644 --- a/src/tools/rbd_mirror/instance_watcher/Types.cc +++ b/src/tools/rbd_mirror/instance_watcher/Types.cc @@ -127,6 +127,43 @@ void SyncPayloadBase::dump(Formatter *f) const { f->dump_string("sync_id", sync_id); } +void GroupPayloadBase::encode(bufferlist &bl) const { + using ceph::encode; + PayloadBase::encode(bl); + encode(global_group_id, bl); +} + +void GroupPayloadBase::decode(__u8 version, bufferlist::const_iterator &iter) { + using ceph::decode; + PayloadBase::decode(version, iter); + decode(global_group_id, iter); +} + +void GroupPayloadBase::dump(Formatter *f) const { + PayloadBase::dump(f); + f->dump_string("global_group_id", global_group_id); +} + +void PeerGroupRemovedPayload::encode(bufferlist &bl) const { + using ceph::encode; + PayloadBase::encode(bl); + encode(global_group_id, bl); + encode(peer_mirror_uuid, bl); +} + +void PeerGroupRemovedPayload::decode(__u8 version, bufferlist::const_iterator &iter) { + using ceph::decode; + PayloadBase::decode(version, iter); + decode(global_group_id, iter); + decode(peer_mirror_uuid, iter); +} + +void PeerGroupRemovedPayload::dump(Formatter *f) const { + PayloadBase::dump(f); + f->dump_string("global_group_id", global_group_id); + f->dump_string("peer_mirror_uuid", peer_mirror_uuid); +} + void UnknownPayload::encode(bufferlist &bl) const { ceph_abort(); } @@ -166,6 +203,15 @@ void NotifyMessage::decode(bufferlist::const_iterator& iter) { case NOTIFY_OP_SYNC_START: payload = SyncStartPayload(); break; + case NOTIFY_OP_GROUP_ACQUIRE: + payload = GroupAcquirePayload(); + break; + case NOTIFY_OP_GROUP_RELEASE: + payload = GroupReleasePayload(); + break; + case NOTIFY_OP_PEER_GROUP_REMOVED: + payload = PeerGroupRemovedPayload(); + break; default: payload = UnknownPayload(); break; @@ -194,6 +240,15 @@ void NotifyMessage::generate_test_instances(std::list &o) { o.push_back(new NotifyMessage(SyncStartPayload())); o.push_back(new NotifyMessage(SyncStartPayload(1, "sync_id"))); + + o.push_back(new NotifyMessage(GroupAcquirePayload())); + o.push_back(new NotifyMessage(GroupAcquirePayload(1, "gid"))); + + o.push_back(new NotifyMessage(GroupReleasePayload())); + o.push_back(new NotifyMessage(GroupReleasePayload(1, "gid"))); + + o.push_back(new NotifyMessage(PeerGroupRemovedPayload())); + o.push_back(new NotifyMessage(PeerGroupRemovedPayload(1, "gid", "uuid"))); } std::ostream &operator<<(std::ostream &out, const NotifyOp &op) { @@ -213,6 +268,15 @@ std::ostream &operator<<(std::ostream &out, const NotifyOp &op) { case NOTIFY_OP_SYNC_START: out << "SyncStart"; break; + case NOTIFY_OP_GROUP_ACQUIRE: + out << "GroupAcquire"; + break; + case NOTIFY_OP_GROUP_RELEASE: + out << "GroupRelease"; + break; + case NOTIFY_OP_PEER_GROUP_REMOVED: + out << "PeerGroupRemoved"; + break; default: out << "Unknown (" << static_cast(op) << ")"; break; diff --git a/src/tools/rbd_mirror/instance_watcher/Types.h b/src/tools/rbd_mirror/instance_watcher/Types.h index b9e0d19bca7..619036c750c 100644 --- a/src/tools/rbd_mirror/instance_watcher/Types.h +++ b/src/tools/rbd_mirror/instance_watcher/Types.h @@ -23,7 +23,10 @@ enum NotifyOp { NOTIFY_OP_IMAGE_RELEASE = 1, NOTIFY_OP_PEER_IMAGE_REMOVED = 2, NOTIFY_OP_SYNC_REQUEST = 3, - NOTIFY_OP_SYNC_START = 4 + NOTIFY_OP_SYNC_START = 4, + NOTIFY_OP_GROUP_ACQUIRE = 5, + NOTIFY_OP_GROUP_RELEASE = 6, + NOTIFY_OP_PEER_GROUP_REMOVED = 7, }; struct PayloadBase { @@ -132,6 +135,61 @@ struct SyncStartPayload : public SyncPayloadBase { } }; +struct GroupPayloadBase : public PayloadBase { + std::string global_group_id; + + GroupPayloadBase() : PayloadBase() { + } + + GroupPayloadBase(uint64_t request_id, const std::string &global_group_id) + : PayloadBase(request_id), global_group_id(global_group_id) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &iter); + void dump(Formatter *f) const; +}; + +struct GroupAcquirePayload : public GroupPayloadBase { + static const NotifyOp NOTIFY_OP = NOTIFY_OP_GROUP_ACQUIRE; + + GroupAcquirePayload() { + } + GroupAcquirePayload(uint64_t request_id, const std::string &global_group_id) + : GroupPayloadBase(request_id, global_group_id) { + } +}; + +struct GroupReleasePayload : public GroupPayloadBase { + static const NotifyOp NOTIFY_OP = NOTIFY_OP_GROUP_RELEASE; + + GroupReleasePayload() { + } + GroupReleasePayload(uint64_t request_id, const std::string &global_group_id) + : GroupPayloadBase(request_id, global_group_id) { + } +}; + +struct PeerGroupRemovedPayload : public PayloadBase { + static const NotifyOp NOTIFY_OP = NOTIFY_OP_PEER_GROUP_REMOVED; + + std::string global_group_id; + std::string peer_mirror_uuid; + + PeerGroupRemovedPayload() { + } + PeerGroupRemovedPayload(uint64_t request_id, + const std::string& global_group_id, + const std::string& peer_mirror_uuid) + : PayloadBase(request_id), + global_group_id(global_group_id), peer_mirror_uuid(peer_mirror_uuid) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &iter); + void dump(Formatter *f) const; +}; + struct UnknownPayload { static const NotifyOp NOTIFY_OP = static_cast(-1); @@ -148,6 +206,9 @@ typedef std::variant Payload; struct NotifyMessage {