]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rbd-mirror: hook GroupReplayer
authorMykola Golub <mgolub@suse.com>
Tue, 5 Jan 2021 16:18:57 +0000 (16:18 +0000)
committerIlya Dryomov <idryomov@gmail.com>
Sun, 28 Sep 2025 18:24:59 +0000 (20:24 +0200)
Signed-off-by: Mykola Golub <mgolub@suse.com>
Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
17 files changed:
src/test/rbd_mirror/test_mock_ImageMap.cc
src/test/rbd_mirror/test_mock_InstanceReplayer.cc
src/test/rbd_mirror/test_mock_InstanceWatcher.cc
src/test/rbd_mirror/test_mock_NamespaceReplayer.cc
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/GroupReplayer.cc [new file with mode: 0644]
src/tools/rbd_mirror/GroupReplayer.h [new file with mode: 0644]
src/tools/rbd_mirror/InstanceReplayer.cc
src/tools/rbd_mirror/InstanceReplayer.h
src/tools/rbd_mirror/InstanceWatcher.cc
src/tools/rbd_mirror/InstanceWatcher.h
src/tools/rbd_mirror/NamespaceReplayer.cc
src/tools/rbd_mirror/NamespaceReplayer.h
src/tools/rbd_mirror/PoolWatcher.cc
src/tools/rbd_mirror/image_map/Types.h
src/tools/rbd_mirror/instance_watcher/Types.cc
src/tools/rbd_mirror/instance_watcher/Types.h

index 419969dd6a9b6d146f730ff905550d4691367512..8ae6281597d52446f42e2ae76bda3a78b3f45afc 100644 (file)
@@ -142,6 +142,10 @@ public:
     MOCK_METHOD2(mock_release_image, void(const std::string &, Context*));
     MOCK_METHOD3(mock_remove_image, void(const std::string &,
                                          const std::string &, Context*));
+    MOCK_METHOD2(mock_acquire_group, void(const std::string &, Context*));
+    MOCK_METHOD2(mock_release_group, void(const std::string &, Context*));
+    MOCK_METHOD3(mock_remove_group, void(const std::string &,
+                                         const std::string &, Context*));
 
     void acquire_image(const std::string &global_image_id,
                        const std::string &instance_id, Context* on_finish) {
@@ -158,6 +162,22 @@ public:
                       const std::string &instance_id, Context* on_finish) {
       mock_remove_image(mirror_uuid, global_image_id, on_finish);
     }
+
+    void acquire_group(const std::string &global_group_id,
+                       const std::string &instance_id, Context* on_finish) {
+      mock_acquire_group(global_group_id, on_finish);
+    }
+
+    void release_group(const std::string &global_group_id,
+                       const std::string &instance_id, Context* on_finish) {
+      mock_release_group(global_group_id, on_finish);
+    }
+
+    void remove_group(const std::string &mirror_uuid,
+                      const std::string &global_group_id,
+                      const std::string &instance_id, Context* on_finish) {
+      mock_remove_group(mirror_uuid, global_group_id, on_finish);
+    }
   };
 
   TestMockImageMap() = default;
index 1cc64be60089ef45e70b6a044f4f374d9d079e65..1de7fec8f1968dc2627f8d1564a77dd416bd2193 100644 (file)
@@ -5,6 +5,7 @@
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "test/rbd_mirror/mock/MockContextWQ.h"
 #include "test/rbd_mirror/mock/MockSafeTimer.h"
+#include "tools/rbd_mirror/GroupReplayer.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/InstanceReplayer.h"
@@ -59,6 +60,54 @@ template<>
 struct InstanceWatcher<librbd::MockTestImageCtx> {
 };
 
+template<>
+struct GroupReplayer<librbd::MockTestImageCtx> {
+  static GroupReplayer* s_instance;
+  std::string global_group_id;
+
+  static GroupReplayer *create(
+      librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+      const std::string &global_group_id,
+      Threads<librbd::MockTestImageCtx> *threads,
+      InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+      MirrorStatusUpdater<librbd::MockTestImageCtx>* local_status_updater,
+      journal::CacheManagerHandler *cache_manager_handler,
+      PoolMetaCache* pool_meta_cache) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->global_group_id = global_group_id;
+    return s_instance;
+  }
+
+  GroupReplayer() {
+    ceph_assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  virtual ~GroupReplayer() {
+    ceph_assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(destroy, void());
+  MOCK_METHOD2(start, void(Context *, bool));
+  MOCK_METHOD2(stop, void(Context *, bool));
+  MOCK_METHOD1(restart, void(Context*));
+  MOCK_METHOD0(flush, void());
+  MOCK_METHOD1(print_status, void(Formatter *));
+  MOCK_METHOD1(add_peer, void(const Peer<librbd::MockTestImageCtx>& peer));
+  MOCK_METHOD0(get_global_group_id, const std::string &());
+  MOCK_METHOD0(is_running, bool());
+  MOCK_METHOD0(is_stopped, bool());
+  MOCK_METHOD0(is_blocklisted, bool());
+
+  MOCK_CONST_METHOD0(is_finished, bool());
+  MOCK_METHOD1(set_finished, void(bool));
+
+  MOCK_CONST_METHOD0(get_health_state, image_replayer::HealthState());
+};
+
+GroupReplayer<librbd::MockTestImageCtx>* GroupReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
+
 template<>
 struct ImageReplayer<librbd::MockTestImageCtx> {
   static ImageReplayer* s_instance;
@@ -133,6 +182,7 @@ using ::testing::WithArg;
 class TestMockInstanceReplayer : public TestMockFixture {
 public:
   typedef Threads<librbd::MockTestImageCtx> MockThreads;
+  typedef GroupReplayer<librbd::MockTestImageCtx> MockGroupReplayer;
   typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
   typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
   typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
@@ -191,6 +241,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
   expect_work_queue(mock_threads);
   Context *timer_ctx = nullptr;
   expect_add_event_after(mock_threads, &timer_ctx);
+  Context *group_timer_ctx = nullptr;
+  expect_add_event_after(mock_threads, &group_timer_ctx);
   instance_replayer.init();
   instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
 
@@ -241,6 +293,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
   instance_replayer.shut_down();
   ASSERT_TRUE(timer_ctx != nullptr);
   delete timer_ctx;
+  ASSERT_TRUE(group_timer_ctx != nullptr);
+  delete group_timer_ctx;
 }
 
 TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) {
@@ -262,6 +316,8 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) {
   expect_work_queue(mock_threads);
   Context *timer_ctx1 = nullptr;
   expect_add_event_after(mock_threads, &timer_ctx1);
+  Context *group_timer_ctx = nullptr;
+  expect_add_event_after(mock_threads, &group_timer_ctx);
   instance_replayer.init();
   instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
 
@@ -316,6 +372,8 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) {
   instance_replayer.shut_down();
   ASSERT_TRUE(timer_ctx2 != nullptr);
   delete timer_ctx2;
+  ASSERT_TRUE(group_timer_ctx != nullptr);
+  delete group_timer_ctx;
 }
 
 TEST_F(TestMockInstanceReplayer, Reacquire) {
@@ -337,6 +395,8 @@ TEST_F(TestMockInstanceReplayer, Reacquire) {
   expect_work_queue(mock_threads);
   Context *timer_ctx = nullptr;
   expect_add_event_after(mock_threads, &timer_ctx);
+  Context *group_timer_ctx = nullptr;
+  expect_add_event_after(mock_threads, &group_timer_ctx);
   instance_replayer.init();
   instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
 
@@ -376,6 +436,8 @@ TEST_F(TestMockInstanceReplayer, Reacquire) {
   instance_replayer.shut_down();
   ASSERT_TRUE(timer_ctx != nullptr);
   delete timer_ctx;
+  ASSERT_TRUE(group_timer_ctx != nullptr);
+  delete group_timer_ctx;
 }
 
 } // namespace mirror
index 1467caa5dda01efea52796bb66feb01b20e38fa6..0b6079d68414371f0b3c58247e5b8fdfcac66ea5 100644 (file)
@@ -82,6 +82,11 @@ struct InstanceReplayer<librbd::MockTestImageCtx> {
   MOCK_METHOD2(release_image, void(const std::string &, Context *));
   MOCK_METHOD3(remove_peer_image, void(const std::string&, const std::string&,
                                        Context *));
+  MOCK_METHOD3(acquire_group, void(InstanceWatcher<librbd::MockTestImageCtx> *,
+                                   const std::string &, Context *));
+  MOCK_METHOD2(release_group, void(const std::string &, Context *));
+  MOCK_METHOD3(remove_peer_group, void(const std::string&, const std::string&,
+                                       Context *));
 };
 
 template <>
index 95dc69353001d867da96de83c3d8e2ff4790af09..0bcfd591cb8299ecbb7198859a3d6d05e1959845 100644 (file)
@@ -159,6 +159,16 @@ struct InstanceWatcher<librbd::MockTestImageCtx> {
                                                const std::string&,
                                                const std::string&,
                                                Context*));
+  MOCK_METHOD3(notify_group_acquire, void(const std::string&,
+                                          const std::string&,
+                                          Context*));
+  MOCK_METHOD3(notify_group_release, void(const std::string&,
+                                          const std::string&,
+                                          Context*));
+  MOCK_METHOD4(notify_peer_group_removed, void(const std::string&,
+                                               const std::string&,
+                                               const std::string&,
+                                               Context*));
 
   MOCK_METHOD1(handle_update_leader, void(const std::string&));
 
index 193a038d18fce0cb10b6a01410bdea0bc798bc67..1913f06af6d0018cbd29fd3b4f5e3f56971a4b2a 100644 (file)
@@ -5,6 +5,7 @@ add_library(rbd_mirror_types STATIC
 
 set(rbd_mirror_internal
   ClusterWatcher.cc
+  GroupReplayer.cc
   ImageDeleter.cc
   ImageMap.cc
   ImageReplayer.cc
diff --git a/src/tools/rbd_mirror/GroupReplayer.cc b/src/tools/rbd_mirror/GroupReplayer.cc
new file mode 100644 (file)
index 0000000..be9540c
--- /dev/null
@@ -0,0 +1,439 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "GroupReplayer.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
+                           << __func__ << ": "
+
+extern PerfCounters *g_perf_counters;
+
+namespace rbd {
+namespace mirror {
+
+namespace {
+
+template <typename I>
+class GroupReplayerAdminSocketCommand {
+public:
+  GroupReplayerAdminSocketCommand(const std::string &desc,
+                                  GroupReplayer<I> *replayer)
+    : desc(desc), replayer(replayer) {
+  }
+  virtual ~GroupReplayerAdminSocketCommand() {}
+  virtual int call(Formatter *f) = 0;
+
+  std::string desc;
+  GroupReplayer<I> *replayer;
+  bool registered = false;
+};
+
+template <typename I>
+class StatusCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+  explicit StatusCommand(const std::string &desc, GroupReplayer<I> *replayer)
+    : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
+
+  int call(Formatter *f) override {
+    this->replayer->print_status(f);
+    return 0;
+  }
+};
+
+template <typename I>
+class StartCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+  explicit StartCommand(const std::string &desc, GroupReplayer<I> *replayer)
+    : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
+
+  int call(Formatter *f) override {
+    this->replayer->start(nullptr, true);
+    return 0;
+  }
+};
+
+template <typename I>
+class StopCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+  explicit StopCommand(const std::string &desc, GroupReplayer<I> *replayer)
+    : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
+
+  int call(Formatter *f) override {
+    this->replayer->stop(nullptr, true);
+    return 0;
+  }
+};
+
+template <typename I>
+class RestartCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+  explicit RestartCommand(const std::string &desc, GroupReplayer<I> *replayer)
+    : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
+
+  int call(Formatter *f) override {
+    this->replayer->restart();
+    return 0;
+  }
+};
+
+template <typename I>
+class FlushCommand : public GroupReplayerAdminSocketCommand<I> {
+public:
+  explicit FlushCommand(const std::string &desc, GroupReplayer<I> *replayer)
+    : GroupReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
+
+  int call(Formatter *f) override {
+    this->replayer->flush();
+    return 0;
+  }
+};
+
+template <typename I>
+class GroupReplayerAdminSocketHook : public AdminSocketHook {
+public:
+  GroupReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+                              GroupReplayer<I> *replayer)
+    : admin_socket(cct->get_admin_socket()),
+      commands{{"rbd mirror group flush " + name,
+                new FlushCommand<I>("flush rbd mirror group " + name, replayer)},
+               {"rbd mirror group restart " + name,
+                new RestartCommand<I>("restart rbd mirror group " + name, replayer)},
+               {"rbd mirror group start " + name,
+                new StartCommand<I>("start rbd mirror group " + name, replayer)},
+               {"rbd mirror group status " + name,
+                new StatusCommand<I>("get status for rbd mirror group " + name, replayer)},
+               {"rbd mirror group stop " + name,
+                new StopCommand<I>("stop rbd mirror group " + name, replayer)}} {
+  }
+
+  int register_commands() {
+    for (auto &it : commands) {
+      int r = admin_socket->register_command(it.first, this,
+                                             it.second->desc);
+      if (r < 0) {
+        return r;
+      }
+      it.second->registered = true;
+    }
+    return 0;
+  }
+
+  ~GroupReplayerAdminSocketHook() override {
+    admin_socket->unregister_commands(this);
+    for (auto &it : commands) {
+      delete it.second;
+    }
+    commands.clear();
+  }
+
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+          const bufferlist&,
+          Formatter *f,
+          std::ostream& errss,
+          bufferlist& out) override {
+    auto i = commands.find(command);
+    ceph_assert(i != commands.end());
+    return i->second->call(f);
+  }
+
+private:
+  typedef std::map<std::string, GroupReplayerAdminSocketCommand<I>*,
+                  std::less<>> Commands;
+
+  AdminSocket *admin_socket;
+  Commands commands;
+};
+
+} // anonymous namespace
+
+template <typename I>
+std::ostream &operator<<(std::ostream &os,
+                         const typename GroupReplayer<I>::State &state) {
+  switch (state) {
+  case GroupReplayer<I>::STATE_STARTING:
+    os << "Starting";
+    break;
+  case GroupReplayer<I>::STATE_REPLAYING:
+    os << "Replaying";
+    break;
+  case GroupReplayer<I>::STATE_STOPPING:
+    os << "Stopping";
+    break;
+  case GroupReplayer<I>::STATE_STOPPED:
+    os << "Stopped";
+    break;
+  default:
+    os << "Unknown (" << static_cast<uint32_t>(state) << ")";
+    break;
+  }
+  return os;
+}
+
+template <typename I>
+std::ostream &operator<<(std::ostream &os, const GroupReplayer<I> &replayer) {
+  std::string nspace = replayer.get_namespace();
+  if (!nspace.empty()) {
+    nspace += "/";
+  }
+
+  os << "GroupReplayer: " << &replayer << " [" << replayer.get_local_pool_id() << "/"
+     << nspace << replayer.get_global_group_id() << "]";
+  return os;
+}
+
+template <typename I>
+GroupReplayer<I>::GroupReplayer(
+    librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+    const std::string &global_group_id, Threads<I> *threads,
+    InstanceWatcher<I> *instance_watcher,
+    MirrorStatusUpdater<I>* local_status_updater,
+    journal::CacheManagerHandler *cache_manager_handler,
+    PoolMetaCache* pool_meta_cache) :
+  m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
+  m_global_group_id(global_group_id), m_threads(threads),
+  m_instance_watcher(instance_watcher),
+  m_local_status_updater(local_status_updater),
+  m_cache_manager_handler(cache_manager_handler),
+  m_pool_meta_cache(pool_meta_cache),
+  m_local_group_name(global_group_id),
+  m_lock(ceph::make_mutex("rbd::mirror::GroupReplayer " +
+      stringify(local_io_ctx.get_id()) + " " + global_group_id)) {
+  // Register asok commands using a temporary "remote_pool_name/global_group_id"
+  // name.  When the group name becomes known on start the asok commands will be
+  // re-registered using "remote_pool_name/remote_group_name" name.
+
+  m_group_spec = image_replayer::util::compute_image_spec(
+      local_io_ctx, global_group_id);
+  register_admin_socket_hook();
+}
+
+template <typename I>
+GroupReplayer<I>::~GroupReplayer() {
+  unregister_admin_socket_hook();
+  ceph_assert(m_on_start_finish == nullptr);
+  ceph_assert(m_on_stop_finish == nullptr);
+}
+
+
+template <typename I>
+image_replayer::HealthState GroupReplayer<I>::get_health_state() const {
+  // TODO: Implement something like m_mirror_image_status_state for group
+  return image_replayer::HEALTH_STATE_OK;
+}
+
+template <typename I>
+void GroupReplayer<I>::add_peer(const Peer<I>& peer) {
+  dout(10) << "peer=" << peer << dendl;
+
+  std::lock_guard locker{m_lock};
+  auto it = m_peers.find(peer);
+  if (it == m_peers.end()) {
+    m_peers.insert(peer);
+  }
+}
+
+template <typename I>
+void GroupReplayer<I>::set_state_description(int r, const std::string &desc) {
+  dout(10) << "r=" << r << ", desc=" << desc << dendl;
+
+  std::lock_guard l{m_lock};
+  m_last_r = r;
+  m_state_desc = desc;
+}
+
+template <typename I>
+void GroupReplayer<I>::start(Context *on_finish, bool manual, bool restart) {
+  dout(10) << "on_finish=" << on_finish << ", manual=" << manual
+           << ", restart=" << restart << dendl;
+
+  int r = 0;
+  {
+    std::lock_guard locker{m_lock};
+    if (!is_stopped_()) {
+      derr << "already running" << dendl;
+      r = -EINVAL;
+    } else if (m_manual_stop && !manual) {
+      dout(5) << "stopped manually, ignoring start without manual flag"
+             << dendl;
+      r = -EPERM;
+    } else if (restart && !m_restart_requested) {
+      dout(10) << "canceled restart" << dendl;
+      r = -ECANCELED;
+    } else {
+      m_state = STATE_STARTING;
+      m_last_r = 0;
+      m_state_desc.clear();
+      m_manual_stop = false;
+    }
+  }
+
+  if (r < 0) {
+    if (on_finish) {
+      on_finish->complete(r);
+    }
+    return;
+  }
+
+  // TODO
+  on_finish->complete(0);
+}
+
+template <typename I>
+void GroupReplayer<I>::stop(Context *on_finish, bool manual, bool restart) {
+  dout(10) << "on_finish=" << on_finish << ", manual=" << manual
+           << ", restart=" << restart << dendl;
+
+  bool running = true;
+  {
+    std::lock_guard locker{m_lock};
+
+    if (restart) {
+      m_restart_requested = true;
+    }
+
+    if (!is_running_()) {
+      running = false;
+      if (!restart && m_restart_requested) {
+        dout(10) << "canceling restart" << dendl;
+        m_restart_requested = false;
+      }
+    } else {
+      if (!is_stopped_()) {
+       if (m_state == STATE_STARTING) {
+         dout(10) << "canceling start" << dendl;
+       } else {
+         dout(10) << "interrupting replay" << dendl;
+       }
+      }
+    }
+  }
+
+  if (!running) {
+    dout(20) << "not running" << dendl;
+    if (on_finish) {
+      on_finish->complete(-EINVAL);
+    }
+    return;
+  }
+
+  // TODO
+  on_finish->complete(0);
+}
+
+template <typename I>
+void GroupReplayer<I>::restart(Context *on_finish) {
+  {
+    std::lock_guard locker{m_lock};
+    m_restart_requested = true;
+  }
+
+  auto ctx = new LambdaContext(
+    [this, on_finish](int r) {
+      if (r < 0) {
+       // Try start anyway.
+      }
+      start(on_finish, true, true);
+    });
+  stop(ctx, false, true);
+}
+
+template <typename I>
+void GroupReplayer<I>::flush() {
+  std::unique_lock locker{m_lock};
+  if (m_state != STATE_REPLAYING) {
+    return;
+  }
+
+  dout(10) << dendl;
+  // TODO
+}
+
+template <typename I>
+void GroupReplayer<I>::print_status(Formatter *f) {
+  dout(10) << dendl;
+
+  std::lock_guard l{m_lock};
+
+  f->open_object_section("group_replayer");
+  f->dump_string("name", m_group_spec);
+  f->dump_stream("state") << m_state;
+  f->close_section();
+}
+
+template <typename I>
+void GroupReplayer<I>::register_admin_socket_hook() {
+  GroupReplayerAdminSocketHook<I> *asok_hook;
+  {
+    std::lock_guard locker{m_lock};
+    if (m_asok_hook != nullptr) {
+      return;
+    }
+
+    dout(15) << "registered asok hook: " << m_group_spec << dendl;
+    asok_hook = new GroupReplayerAdminSocketHook<I>(
+      g_ceph_context, m_group_spec, this);
+    int r = asok_hook->register_commands();
+    if (r == 0) {
+      m_asok_hook = asok_hook;
+      return;
+    }
+    derr << "error registering admin socket commands" << dendl;
+  }
+  delete asok_hook;
+}
+
+template <typename I>
+void GroupReplayer<I>::unregister_admin_socket_hook() {
+  dout(15) << dendl;
+
+  AdminSocketHook *asok_hook = nullptr;
+  {
+    std::lock_guard locker{m_lock};
+    std::swap(asok_hook, m_asok_hook);
+  }
+  delete asok_hook;
+}
+
+template <typename I>
+void GroupReplayer<I>::reregister_admin_socket_hook() {
+  std::unique_lock locker{m_lock};
+
+  auto group_spec = image_replayer::util::compute_image_spec(
+    m_local_io_ctx, m_local_group_name);
+  if (m_asok_hook != nullptr && m_group_spec == group_spec) {
+    return;
+  }
+
+  dout(15) << "old_group_spec=" << m_group_spec << ", "
+           << "new_group_spec=" << group_spec << dendl;
+  m_group_spec = group_spec;
+
+  if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
+    // no need to re-register if stopping
+    return;
+  }
+  locker.unlock();
+
+  unregister_admin_socket_hook();
+  register_admin_socket_hook();
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::GroupReplayer<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/GroupReplayer.h b/src/tools/rbd_mirror/GroupReplayer.h
new file mode 100644 (file)
index 0000000..07d9802
--- /dev/null
@@ -0,0 +1,181 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_GROUP_REPLAYER_H
+#define CEPH_RBD_MIRROR_GROUP_REPLAYER_H
+
+#include "common/ceph_mutex.h"
+#include "include/rados/librados.hpp"
+#include "tools/rbd_mirror/Types.h"
+#include "tools/rbd_mirror/image_replayer/Types.h"
+#include <boost/optional.hpp>
+#include <string>
+
+class AdminSocketHook;
+
+namespace journal { struct CacheManagerHandler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+template <typename> struct InstanceWatcher;
+template <typename> struct MirrorStatusUpdater;
+struct PoolMetaCache;
+template <typename> struct Threads;
+
+/**
+ * Replays changes from a remote cluster for a single group.
+ */
+template <typename ImageCtxT = librbd::ImageCtx>
+class GroupReplayer {
+public:
+  static GroupReplayer *create(
+      librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+      const std::string &global_group_id, Threads<ImageCtxT> *threads,
+      InstanceWatcher<ImageCtxT> *instance_watcher,
+      MirrorStatusUpdater<ImageCtxT>* local_status_updater,
+      journal::CacheManagerHandler *cache_manager_handler,
+      PoolMetaCache* pool_meta_cache) {
+    return new GroupReplayer(local_io_ctx, local_mirror_uuid, global_group_id,
+                             threads, instance_watcher, local_status_updater,
+                             cache_manager_handler, pool_meta_cache);
+  }
+  void destroy() {
+    delete this;
+  }
+
+  GroupReplayer(librados::IoCtx &local_io_ctx,
+                const std::string &local_mirror_uuid,
+                const std::string &global_group_id,
+                Threads<ImageCtxT> *threads,
+                InstanceWatcher<ImageCtxT> *instance_watcher,
+                MirrorStatusUpdater<ImageCtxT>* local_status_updater,
+                journal::CacheManagerHandler *cache_manager_handler,
+                PoolMetaCache* pool_meta_cache);
+  virtual ~GroupReplayer();
+  GroupReplayer(const GroupReplayer&) = delete;
+  GroupReplayer& operator=(const GroupReplayer&) = delete;
+
+  bool is_stopped() { std::lock_guard l{m_lock}; return is_stopped_(); }
+  bool is_running() { std::lock_guard l{m_lock}; return is_running_(); }
+  bool is_replaying() { std::lock_guard l{m_lock}; return is_replaying_(); }
+
+  std::string get_name() { std::lock_guard l{m_lock}; return m_group_spec; };
+  void set_state_description(int r, const std::string &desc);
+
+  // TODO temporary until policy handles release of group replayers
+  inline bool is_finished() const {
+    std::lock_guard locker{m_lock};
+    return m_finished;
+  }
+  inline void set_finished(bool finished) {
+    std::lock_guard locker{m_lock};
+    m_finished = finished;
+  }
+
+  inline bool is_blocklisted() const {
+    std::lock_guard locker{m_lock};
+    return (m_last_r == -EBLOCKLISTED);
+  }
+
+  image_replayer::HealthState get_health_state() const;
+
+  void add_peer(const Peer<ImageCtxT>& peer);
+
+  inline int64_t get_local_pool_id() const {
+    return m_local_io_ctx.get_id();
+  }
+  inline std::string get_namespace() const {
+    return m_local_io_ctx.get_namespace();
+  }
+  inline const std::string& get_global_group_id() const {
+    return m_global_group_id;
+  }
+
+  void start(Context *on_finish = nullptr, bool manual = false,
+             bool restart = false);
+  void stop(Context *on_finish = nullptr, bool manual = false,
+            bool restart = false);
+  void restart(Context *on_finish = nullptr);
+  void flush();
+
+  void print_status(Formatter *f);
+
+  template <typename>
+  friend std::ostream &operator<<(std::ostream &os,
+                                  const GroupReplayer &replayer);
+
+  /**
+   * @verbatim
+   *                   (error)
+   * <uninitialized> <------------------------------------ FAIL
+   *    |                                                   ^
+   *    v                                                   *
+   * <starting>                                             *
+   *    |                                                   *
+   *    v                                           (error) *
+   * <stopped>
+   *
+   * @endverbatim
+   */
+
+private:
+  typedef std::set<Peer<ImageCtxT>> Peers;
+
+  enum State {
+    STATE_UNKNOWN,
+    STATE_STARTING,
+    STATE_REPLAYING,
+    STATE_STOPPING,
+    STATE_STOPPED,
+  };
+
+  librados::IoCtx &m_local_io_ctx;
+  std::string m_local_mirror_uuid;
+  std::string m_global_group_id;
+  Threads<ImageCtxT> *m_threads;
+  InstanceWatcher<ImageCtxT> *m_instance_watcher;
+  MirrorStatusUpdater<ImageCtxT>* m_local_status_updater;
+  journal::CacheManagerHandler *m_cache_manager_handler;
+  PoolMetaCache* m_pool_meta_cache;
+
+  std::string m_local_group_name;
+  std::string m_group_spec;
+  Peers m_peers;
+
+  mutable ceph::mutex m_lock;
+  State m_state = STATE_STOPPED;
+  std::string m_state_desc;
+  int m_last_r = 0;
+
+  Context *m_on_start_finish = nullptr;
+  Context *m_on_stop_finish = nullptr;
+  bool m_stop_requested = false;
+  bool m_restart_requested = false;
+  bool m_manual_stop = false;
+  bool m_finished = false;
+
+  AdminSocketHook *m_asok_hook = nullptr;
+
+  bool is_stopped_() const {
+    return m_state == STATE_STOPPED;
+  }
+  bool is_running_() const {
+    return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
+  }
+  bool is_replaying_() const {
+    return (m_state == STATE_REPLAYING);
+  }
+
+  void register_admin_socket_hook();
+  void unregister_admin_socket_hook();
+  void reregister_admin_socket_hook();
+};
+
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::GroupReplayer<librbd::ImageCtx>;
+
+#endif // CEPH_RBD_MIRROR_GROUP_REPLAYER_H
index f278ab09bc14dad8161927f240c303f69b9be51e..11c73e9f1ab78d0ae26fe68cdb28187875a92643 100644 (file)
@@ -8,6 +8,7 @@
 #include "common/errno.h"
 #include "librbd/Utils.h"
 #include "librbd/asio/ContextWQ.h"
+#include "GroupReplayer.h"
 #include "ImageReplayer.h"
 #include "InstanceReplayer.h"
 #include "ServiceDaemon.h"
@@ -24,9 +25,13 @@ namespace mirror {
 
 namespace {
 
-const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
-const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
-const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
+const std::string SERVICE_DAEMON_IMAGE_ASSIGNED_COUNT_KEY("image_assigned_count");
+const std::string SERVICE_DAEMON_IMAGE_WARNING_COUNT_KEY("image_warning_count");
+const std::string SERVICE_DAEMON_IMAGE_ERROR_COUNT_KEY("image_error_count");
+
+const std::string SERVICE_DAEMON_GROUP_ASSIGNED_COUNT_KEY("group_assigned_count");
+const std::string SERVICE_DAEMON_GROUP_WARNING_COUNT_KEY("group_warning_count");
+const std::string SERVICE_DAEMON_GROUP_ERROR_COUNT_KEY("group_error_count");
 
 } // anonymous namespace
 
@@ -78,6 +83,7 @@ void InstanceReplayer<I>::init(Context *on_finish) {
       {
         std::lock_guard timer_locker{m_threads->timer_lock};
         schedule_image_state_check_task();
+        schedule_group_state_check_task();
       }
       on_finish->complete(0);
     });
@@ -127,6 +133,17 @@ void InstanceReplayer<I>::release_all(Context *on_finish) {
   std::lock_guard locker{m_lock};
 
   C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
+  for (auto it = m_group_replayers.begin(); it != m_group_replayers.end();
+       it = m_group_replayers.erase(it)) {
+    auto group_replayer = it->second;
+    auto ctx = gather_ctx->new_sub();
+    ctx = new LambdaContext(
+      [group_replayer, ctx] (int r) {
+        group_replayer->destroy();
+        ctx->complete(0);
+      });
+    stop_group_replayer(group_replayer, ctx);
+  }
   for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
        it = m_image_replayers.erase(it)) {
     auto image_replayer = it->second;
@@ -229,15 +246,107 @@ void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
   m_threads->work_queue->queue(on_finish, 0);
 }
 
+template <typename I>
+void InstanceReplayer<I>::acquire_group(InstanceWatcher<I> *instance_watcher,
+                                        const std::string &global_group_id,
+                                        Context *on_finish) {
+  dout(10) << "global_group_id=" << global_group_id << dendl;
+
+  std::lock_guard locker{m_lock};
+
+  ceph_assert(m_on_shut_down == nullptr);
+
+  auto it = m_group_replayers.find(global_group_id);
+  if (it == m_group_replayers.end()) {
+    auto group_replayer = GroupReplayer<I>::create(
+        m_local_io_ctx, m_local_mirror_uuid, global_group_id,
+        m_threads, instance_watcher, m_local_status_updater,
+        m_cache_manager_handler, m_pool_meta_cache);
+
+    dout(10) << global_group_id << ": creating replayer " << group_replayer
+             << dendl;
+
+    it = m_group_replayers.insert(std::make_pair(global_group_id,
+                                                 group_replayer)).first;
+
+    // TODO only a single peer is currently supported
+    ceph_assert(m_peers.size() == 1);
+    auto peer = *m_peers.begin();
+    group_replayer->add_peer(peer);
+    start_group_replayer(group_replayer);
+  } else {
+    // A duplicate acquire notification implies (1) connection hiccup or
+    // (2) new leader election. For the second case, restart the replayer to
+    // detect if the group has been deleted while the leader was offline
+    auto& group_replayer = it->second;
+    group_replayer->set_finished(false);
+    group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
+  }
+
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_group(const std::string &global_group_id,
+                                        Context *on_finish) {
+  dout(10) << "global_group_id=" << global_group_id << dendl;
+
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_on_shut_down == nullptr);
+
+  auto it = m_group_replayers.find(global_group_id);
+  if (it == m_group_replayers.end()) {
+    dout(5) << global_group_id << ": not found" << dendl;
+    m_threads->work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  auto group_replayer = it->second;
+  m_group_replayers.erase(it);
+
+  on_finish = new LambdaContext(
+    [group_replayer, on_finish] (int r) {
+      group_replayer->destroy();
+      on_finish->complete(0);
+    });
+  stop_group_replayer(group_replayer, on_finish);
+}
+
+template <typename I>
+void InstanceReplayer<I>::remove_peer_group(const std::string &global_group_id,
+                                            const std::string &peer_mirror_uuid,
+                                            Context *on_finish) {
+  dout(10) << "global_group_id=" << global_group_id << ", "
+           << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_on_shut_down == nullptr);
+
+  auto it = m_group_replayers.find(global_group_id);
+  if (it != m_group_replayers.end()) {
+    // TODO only a single peer is currently supported, therefore
+    // we can just interrupt the current group replayer and
+    // it will eventually detect that the peer group is missing and
+    // determine if a delete propagation is required.
+    auto group_replayer = it->second;
+    group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
+  }
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
 template <typename I>
 void InstanceReplayer<I>::print_status(Formatter *f) {
   dout(10) << dendl;
 
   std::lock_guard locker{m_lock};
 
+  f->open_array_section("group_replayers");
+  for (auto &[_, group_replayer] : m_group_replayers) {
+    group_replayer->print_status(f);
+  }
+  f->close_section();
   f->open_array_section("image_replayers");
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
+  for (auto &[_, image_replayer] : m_image_replayers) {
     image_replayer->print_status(f);
   }
   f->close_section();
@@ -255,8 +364,10 @@ void InstanceReplayer<I>::start()
   auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
   auto gather_ctx = new C_Gather(
     cct, new C_TrackedOp(m_async_op_tracker, nullptr));
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
+  for (auto &[_, group_replayer] : m_group_replayers) {
+    group_replayer->start(gather_ctx->new_sub(), true);
+  }
+  for (auto &[_, image_replayer] : m_image_replayers) {
     image_replayer->start(gather_ctx->new_sub(), true);
   }
 
@@ -290,8 +401,10 @@ void InstanceReplayer<I>::stop(Context *on_finish)
 
     m_manual_stop = true;
 
-    for (auto &kv : m_image_replayers) {
-      auto &image_replayer = kv.second;
+    for (auto &[_, group_replayer] : m_group_replayers) {
+      group_replayer->stop(gather_ctx->new_sub(), true);
+    }
+    for (auto &[_, image_replayer] : m_image_replayers) {
       image_replayer->stop(gather_ctx->new_sub(), true);
     }
   }
@@ -308,8 +421,10 @@ void InstanceReplayer<I>::restart()
 
   m_manual_stop = false;
 
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
+  for (auto &[_, group_replayer] : m_group_replayers) {
+    group_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
+  }
+  for (auto &[_, image_replayer] : m_image_replayers) {
     image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
   }
 }
@@ -321,8 +436,10 @@ void InstanceReplayer<I>::flush()
 
   std::lock_guard locker{m_lock};
 
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
+  for (auto &[_, group_replayer] : m_group_replayers) {
+    group_replayer->flush();
+  }
+  for (auto &[_, image_replayer] : m_image_replayers) {
     image_replayer->flush();
   }
 }
@@ -397,13 +514,13 @@ void InstanceReplayer<I>::start_image_replayers(
 
   m_service_daemon->add_or_update_namespace_attribute(
     m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
-    SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
+    SERVICE_DAEMON_IMAGE_ASSIGNED_COUNT_KEY, image_count);
   m_service_daemon->add_or_update_namespace_attribute(
     m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
-    SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
+    SERVICE_DAEMON_IMAGE_WARNING_COUNT_KEY, warning_count);
   m_service_daemon->add_or_update_namespace_attribute(
     m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
-    SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
+    SERVICE_DAEMON_IMAGE_ERROR_COUNT_KEY, error_count);
 }
 
 template <typename I>
@@ -451,26 +568,6 @@ void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
   }
 }
 
-template <typename I>
-void InstanceReplayer<I>::wait_for_ops() {
-  dout(10) << dendl;
-
-  Context *ctx = create_context_callback<
-    InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
-
-  m_async_op_tracker.wait_for_ops(ctx);
-}
-
-template <typename I>
-void InstanceReplayer<I>::handle_wait_for_ops(int r) {
-  dout(10) << "r=" << r << dendl;
-
-  ceph_assert(r == 0);
-
-  std::lock_guard locker{m_lock};
-  stop_image_replayers();
-}
-
 template <typename I>
 void InstanceReplayer<I>::stop_image_replayers() {
   dout(10) << dendl;
@@ -546,6 +643,215 @@ void InstanceReplayer<I>::schedule_image_state_check_task() {
   m_threads->timer->add_event_after(after, m_image_state_check_task);
 }
 
+template <typename I>
+void InstanceReplayer<I>::start_group_replayer(
+    GroupReplayer<I> *group_replayer) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  std::string global_group_id = group_replayer->get_global_group_id();
+  if (!group_replayer->is_stopped()) {
+    return;
+  } else if (group_replayer->is_blocklisted()) {
+    derr << "global_group_id=" << global_group_id << ": blocklisted detected "
+         << "during group replay" << dendl;
+    m_blocklisted = true;
+    return;
+  } else if (group_replayer->is_finished()) {
+    // TODO temporary until policy integrated
+    dout(5) << "removing group replayer for global_group_id="
+            << global_group_id << dendl;
+    m_group_replayers.erase(group_replayer->get_global_group_id());
+    group_replayer->destroy();
+    return;
+  } else if (m_manual_stop) {
+    return;
+  }
+
+  dout(10) << "global_group_id=" << global_group_id << dendl;
+  group_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
+}
+
+template <typename I>
+void InstanceReplayer<I>::queue_start_group_replayers() {
+  dout(10) << dendl;
+
+  Context *ctx = create_context_callback<
+    InstanceReplayer, &InstanceReplayer<I>::start_group_replayers>(this);
+  m_async_op_tracker.start_op();
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_group_replayers(int r) {
+  dout(10) << dendl;
+
+  std::lock_guard locker{m_lock};
+  if (m_on_shut_down != nullptr) {
+    return;
+  }
+
+  uint64_t group_count = 0;
+  uint64_t warning_count = 0;
+  uint64_t error_count = 0;
+  for (auto it = m_group_replayers.begin();
+       it != m_group_replayers.end();) {
+    auto current_it(it);
+    ++it;
+
+    ++group_count;
+    auto health_state = current_it->second->get_health_state();
+    if (health_state == image_replayer::HEALTH_STATE_WARNING) {
+      ++warning_count;
+    } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
+      ++error_count;
+    }
+
+    start_group_replayer(current_it->second);
+  }
+
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_GROUP_ASSIGNED_COUNT_KEY, group_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_GROUP_WARNING_COUNT_KEY, warning_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_GROUP_ERROR_COUNT_KEY, error_count);
+
+  m_async_op_tracker.finish_op();
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_group_replayer(GroupReplayer<I> *group_replayer,
+                                              Context *on_finish) {
+  dout(10) << group_replayer << " global_group_id="
+           << group_replayer->get_global_group_id() << ", on_finish="
+           << on_finish << dendl;
+
+  if (group_replayer->is_stopped()) {
+    m_threads->work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  m_async_op_tracker.start_op();
+  Context *ctx = create_async_context_callback(
+    m_threads->work_queue, new LambdaContext(
+      [this, group_replayer, on_finish] (int r) {
+        stop_group_replayer(group_replayer, on_finish);
+        m_async_op_tracker.finish_op();
+      }));
+
+  if (group_replayer->is_running()) {
+    group_replayer->stop(ctx, false);
+  } else {
+    int after = 1;
+    dout(10) << "scheduling group replayer " << group_replayer << " stop after "
+             << after << " sec (task " << ctx << ")" << dendl;
+    ctx = new LambdaContext(
+      [this, after, ctx] (int r) {
+        std::lock_guard timer_locker{m_threads->timer_lock};
+        m_threads->timer->add_event_after(after, ctx);
+      });
+    m_threads->work_queue->queue(ctx, 0);
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_group_replayers() {
+  dout(10) << dendl;
+
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  Context *ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
+    &InstanceReplayer<I>::handle_stop_group_replayers>(this));
+
+  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+  for (auto &[_, group_replayer] : m_group_replayers) {
+    stop_group_replayer(group_replayer, gather_ctx->new_sub());
+  }
+  gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_stop_group_replayers(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  ceph_assert(r == 0);
+
+  Context *on_finish = nullptr;
+  {
+    std::lock_guard locker{m_lock};
+
+    for (auto &[_, group_replayer] : m_group_replayers) {
+      ceph_assert(group_replayer->is_stopped());
+      group_replayer->destroy();
+    }
+    m_group_replayers.clear();
+
+    ceph_assert(m_on_shut_down != nullptr);
+    std::swap(on_finish, m_on_shut_down);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceReplayer<I>::cancel_group_state_check_task() {
+  std::lock_guard timer_locker{m_threads->timer_lock};
+
+  if (m_group_state_check_task == nullptr) {
+    return;
+  }
+
+  dout(10) << m_group_state_check_task << dendl;
+  bool canceled = m_threads->timer->cancel_event(m_group_state_check_task);
+  ceph_assert(canceled);
+  m_group_state_check_task = nullptr;
+}
+
+template <typename I>
+void InstanceReplayer<I>::schedule_group_state_check_task() {
+  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+  ceph_assert(m_group_state_check_task == nullptr);
+
+  m_group_state_check_task = new LambdaContext(
+    [this](int r) {
+      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+      m_group_state_check_task = nullptr;
+      schedule_group_state_check_task();
+      queue_start_group_replayers();
+    });
+
+  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+  int after = cct->_conf.get_val<uint64_t>(
+    "rbd_mirror_image_state_check_interval");
+
+  dout(10) << "scheduling group state check after " << after << " sec (task "
+           << m_group_state_check_task << ")" << dendl;
+  m_threads->timer->add_event_after(after, m_group_state_check_task);
+}
+
+template <typename I>
+void InstanceReplayer<I>::wait_for_ops() {
+  dout(10) << dendl;
+
+  Context *ctx = create_context_callback<
+    InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
+
+  m_async_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_wait_for_ops(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  ceph_assert(r == 0);
+
+  std::lock_guard locker{m_lock};
+  stop_image_replayers();
+}
+
 } // namespace mirror
 } // namespace rbd
 
index 5399c6994a5a6feb49736c05c920afb67419d1dc..f83a2ba688f8a78cfd5df4c1eda41782d9fd9b55 100644 (file)
@@ -19,6 +19,7 @@ namespace librbd { class ImageCtx; }
 namespace rbd {
 namespace mirror {
 
+template <typename> class GroupReplayer;
 template <typename> class ImageReplayer;
 template <typename> class InstanceWatcher;
 template <typename> class MirrorStatusUpdater;
@@ -69,6 +70,13 @@ public:
                          const std::string &peer_mirror_uuid,
                          Context *on_finish);
 
+  void acquire_group(InstanceWatcher<ImageCtxT> *instance_watcher,
+                     const std::string &global_group_id, Context *on_finish);
+  void release_group(const std::string &global_group_id, Context *on_finish);
+  void remove_peer_group(const std::string &global_group_id,
+                         const std::string &peer_mirror_uuid,
+                         Context *on_finish);
+
   void release_all(Context *on_finish);
 
   void print_status(Formatter *f);
@@ -85,8 +93,13 @@ private:
    *
    * <uninitialized> <-------------------\
    *    | (init)                         |                    (repeat for each
-   *    v                             STOP_IMAGE_REPLAYER ---\ image replayer)
-   * SCHEDULE_IMAGE_STATE_CHECK_TASK     ^         ^         |
+   *    v                             STOP_GROUP_REPLAYER ---\ group replayer)
+   * SCHEDULE_GROUP_STATE_CHECK_TASK     ^         ^         |
+   *    |                                |         |         |
+   *    v                                |         \---------/
+   * SCHEDULE_IMAGE_STATE_CHECK_TASK     |                    (repeat for each
+   *    |                             STOP_IMAGE_REPLAYER ---\ image replayer)
+   *    |                                ^         ^         |
    *    |                                |         |         |
    *    v          (shut_down)           |         \---------/
    * <initialized> -----------------> WAIT_FOR_OPS
@@ -107,8 +120,10 @@ private:
   mutable ceph::mutex m_lock;
   AsyncOpTracker m_async_op_tracker;
   std::map<std::string, ImageReplayer<ImageCtxT> *> m_image_replayers;
+  std::map<std::string, GroupReplayer<ImageCtxT> *> m_group_replayers;
   Peers m_peers;
   Context *m_image_state_check_task = nullptr;
+  Context *m_group_state_check_task = nullptr;
   Context *m_on_shut_down = nullptr;
   bool m_manual_stop = false;
   bool m_blocklisted = false;
@@ -129,6 +144,19 @@ private:
 
   void schedule_image_state_check_task();
   void cancel_image_state_check_task();
+
+  void start_group_replayer(GroupReplayer<ImageCtxT> *group_replayer);
+  void queue_start_group_replayers();
+  void start_group_replayers(int r);
+
+  void stop_group_replayer(GroupReplayer<ImageCtxT> *group_replayer,
+                           Context *on_finish);
+
+  void stop_group_replayers();
+  void handle_stop_group_replayers(int r);
+
+  void schedule_group_state_check_task();
+  void cancel_group_state_check_task();
 };
 
 } // namespace mirror
index 2246442d5b3782c615e694170b89e60dd537ffbd..e0daac36e8cc1518c0c509e98957fbc90abc1b25 100644 (file)
@@ -461,6 +461,64 @@ void InstanceWatcher<I>::notify_peer_image_removed(
   req->send();
 }
 
+template <typename I>
+void InstanceWatcher<I>::notify_group_acquire(
+    const std::string &instance_id, const std::string &global_group_id,
+    Context *on_notify_ack) {
+  dout(10) << "instance_id=" << instance_id << ", global_group_id="
+           << global_group_id << dendl;
+
+  std::lock_guard locker{m_lock};
+
+  ceph_assert(m_on_finish == nullptr);
+
+  uint64_t request_id = ++m_request_seq;
+  bufferlist bl;
+  encode(NotifyMessage{GroupAcquirePayload{request_id, global_group_id}}, bl);
+  auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+                                         std::move(bl), on_notify_ack);
+  req->send();
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_group_release(
+    const std::string &instance_id, const std::string &global_group_id,
+    Context *on_notify_ack) {
+  dout(10) << "instance_id=" << instance_id << ", global_group_id="
+           << global_group_id << dendl;
+
+  std::lock_guard locker{m_lock};
+
+  ceph_assert(m_on_finish == nullptr);
+
+  uint64_t request_id = ++m_request_seq;
+  bufferlist bl;
+  encode(NotifyMessage{GroupReleasePayload{request_id, global_group_id}}, bl);
+  auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+                                         std::move(bl), on_notify_ack);
+  req->send();
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_peer_group_removed(
+    const std::string &instance_id, const std::string &global_group_id,
+    const std::string &peer_mirror_uuid, Context *on_notify_ack) {
+  dout(10) << "instance_id=" << instance_id << ", "
+           << "global_group_id=" << global_group_id << ", "
+           << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_on_finish == nullptr);
+
+  uint64_t request_id = ++m_request_seq;
+  bufferlist bl;
+  encode(NotifyMessage{PeerGroupRemovedPayload{request_id, global_group_id,
+                                               peer_mirror_uuid}}, bl);
+  auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+                                         std::move(bl), on_notify_ack);
+  req->send();
+}
+
 template <typename I>
 void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
                                              Context *on_sync_start) {
@@ -1145,6 +1203,54 @@ void InstanceWatcher<I>::handle_peer_image_removed(
   m_work_queue->queue(ctx, 0);
 }
 
+template <typename I>
+void InstanceWatcher<I>::handle_group_acquire(
+    const std::string &global_group_id, Context *on_finish) {
+  dout(10) << "global_group_id=" << global_group_id << dendl;
+
+  auto ctx = new LambdaContext(
+      [this, global_group_id, on_finish] (int r) {
+        m_instance_replayer->acquire_group(this, global_group_id, on_finish);
+        m_notify_op_tracker.finish_op();
+      });
+
+  m_notify_op_tracker.start_op();
+  m_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_group_release(
+    const std::string &global_group_id, Context *on_finish) {
+  dout(10) << "global_group_id=" << global_group_id << dendl;
+
+  auto ctx = new LambdaContext(
+      [this, global_group_id, on_finish] (int r) {
+        m_instance_replayer->release_group(global_group_id, on_finish);
+        m_notify_op_tracker.finish_op();
+      });
+
+  m_notify_op_tracker.start_op();
+  m_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_peer_group_removed(
+    const std::string &global_group_id, const std::string &peer_mirror_uuid,
+    Context *on_finish) {
+  dout(10) << "global_group_id=" << global_group_id << ", "
+           << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+  auto ctx = new LambdaContext(
+      [this, peer_mirror_uuid, global_group_id, on_finish] (int r) {
+        m_instance_replayer->remove_peer_group(global_group_id,
+                                               peer_mirror_uuid, on_finish);
+        m_notify_op_tracker.finish_op();
+      });
+
+  m_notify_op_tracker.start_op();
+  m_work_queue->queue(ctx, 0);
+}
+
 template <typename I>
 void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
                                              const std::string &sync_id,
@@ -1243,6 +1349,49 @@ void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
   }
 }
 
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const GroupAcquirePayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(10) << "group_acquire: instance_id=" << instance_id << ", "
+           << "request_id=" << payload.request_id << dendl;
+
+  auto on_finish = prepare_request(instance_id, payload.request_id,
+                                   on_notify_ack);
+  if (on_finish != nullptr) {
+    handle_group_acquire(payload.global_group_id, on_finish);
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const GroupReleasePayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(10) << "group_release: instance_id=" << instance_id << ", "
+           << "request_id=" << payload.request_id << dendl;
+
+  auto on_finish = prepare_request(instance_id, payload.request_id,
+                                   on_notify_ack);
+  if (on_finish != nullptr) {
+    handle_group_release(payload.global_group_id, on_finish);
+  }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+                                        const PeerGroupRemovedPayload &payload,
+                                        C_NotifyAck *on_notify_ack) {
+  dout(10) << "remove_peer_group: instance_id=" << instance_id << ", "
+           << "request_id=" << payload.request_id << dendl;
+
+  auto on_finish = prepare_request(instance_id, payload.request_id,
+                                   on_notify_ack);
+  if (on_finish != nullptr) {
+    handle_peer_group_removed(payload.global_group_id, payload.peer_mirror_uuid,
+                              on_finish);
+  }
+}
+
 template <typename I>
 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
                                         const SyncRequestPayload &payload,
index 08e40b40bf1050a8075de1df3ed9080fb6f50e47..c35b7fe3d49e3afd937159babba993f7b3b30207 100644 (file)
@@ -78,6 +78,17 @@ public:
                                  const std::string &peer_mirror_uuid,
                                  Context *on_notify_ack);
 
+  void notify_group_acquire(const std::string &instance_id,
+                            const std::string &global_group_id,
+                            Context *on_notify_ack);
+  void notify_group_release(const std::string &instance_id,
+                            const std::string &global_group_id,
+                           Context *on_notify_ack);
+  void notify_peer_group_removed(const std::string &instance_id,
+                                 const std::string &global_group_id,
+                                 const std::string &peer_mirror_uuid,
+                                 Context *on_notify_ack);
+
   void notify_sync_request(const std::string &sync_id, Context *on_sync_start);
   bool cancel_sync_request(const std::string &sync_id);
   void notify_sync_complete(const std::string &sync_id);
@@ -238,6 +249,14 @@ private:
                                  const std::string &peer_mirror_uuid,
                                  Context *on_finish);
 
+  void handle_group_acquire(const std::string &global_group_id,
+                            Context *on_finish);
+  void handle_group_release(const std::string &global_group_id,
+                            Context *on_finish);
+  void handle_peer_group_removed(const std::string &global_group_id,
+                                 const std::string &peer_mirror_uuid,
+                                 Context *on_finish);
+
   void handle_sync_request(const std::string &instance_id,
                            const std::string &sync_id, Context *on_finish);
   void handle_sync_start(const std::string &instance_id,
@@ -252,6 +271,15 @@ private:
   void handle_payload(const std::string &instance_id,
                       const instance_watcher::PeerImageRemovedPayload &payload,
                       C_NotifyAck *on_notify_ack);
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::GroupAcquirePayload &payload,
+                      C_NotifyAck *on_notify_ack);
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::GroupReleasePayload &payload,
+                      C_NotifyAck *on_notify_ack);
+  void handle_payload(const std::string &instance_id,
+                      const instance_watcher::PeerGroupRemovedPayload &payload,
+                      C_NotifyAck *on_notify_ack);
   void handle_payload(const std::string &instance_id,
                       const instance_watcher::SyncRequestPayload &payload,
                       C_NotifyAck *on_notify_ack);
index 3cdfa92b0dacbb897445db78c31cb78ed61c7873..545ee4d620b1d9b47a1620c114419ab2c72c95f2 100644 (file)
@@ -822,8 +822,8 @@ void NamespaceReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish)
 
 template <typename I>
 void NamespaceReplayer<I>::handle_acquire_image(const std::string &global_image_id,
-                                           const std::string &instance_id,
-                                           Context* on_finish) {
+                                                const std::string &instance_id,
+                                                Context* on_finish) {
   dout(5) << "global_image_id=" << global_image_id << ", "
           << "instance_id=" << instance_id << dendl;
 
@@ -833,8 +833,8 @@ void NamespaceReplayer<I>::handle_acquire_image(const std::string &global_image_
 
 template <typename I>
 void NamespaceReplayer<I>::handle_release_image(const std::string &global_image_id,
-                                           const std::string &instance_id,
-                                           Context* on_finish) {
+                                                const std::string &instance_id,
+                                                Context* on_finish) {
   dout(5) << "global_image_id=" << global_image_id << ", "
           << "instance_id=" << instance_id << dendl;
 
@@ -844,9 +844,9 @@ void NamespaceReplayer<I>::handle_release_image(const std::string &global_image_
 
 template <typename I>
 void NamespaceReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
-                                          const std::string &global_image_id,
-                                          const std::string &instance_id,
-                                          Context* on_finish) {
+                                               const std::string &global_image_id,
+                                               const std::string &instance_id,
+                                               Context* on_finish) {
   ceph_assert(!mirror_uuid.empty());
   dout(5) << "mirror_uuid=" << mirror_uuid << ", "
           << "global_image_id=" << global_image_id << ", "
@@ -856,6 +856,42 @@ void NamespaceReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
                                                 mirror_uuid, on_finish);
 }
 
+template <typename I>
+void NamespaceReplayer<I>::handle_acquire_group(const std::string &global_group_id,
+                                                const std::string &instance_id,
+                                                Context* on_finish) {
+  dout(5) << "global_group_id=" << global_group_id << ", "
+          << "instance_id=" << instance_id << dendl;
+
+  m_instance_watcher->notify_group_acquire(instance_id, global_group_id,
+                                           on_finish);
+}
+
+template <typename I>
+void NamespaceReplayer<I>::handle_release_group(const std::string &global_group_id,
+                                                const std::string &instance_id,
+                                                Context* on_finish) {
+  dout(5) << "global_group_id=" << global_group_id << ", "
+          << "instance_id=" << instance_id << dendl;
+
+  m_instance_watcher->notify_group_release(instance_id, global_group_id,
+                                           on_finish);
+}
+
+template <typename I>
+void NamespaceReplayer<I>::handle_remove_group(const std::string &mirror_uuid,
+                                               const std::string &global_group_id,
+                                               const std::string &instance_id,
+                                               Context* on_finish) {
+  ceph_assert(!mirror_uuid.empty());
+  dout(5) << "mirror_uuid=" << mirror_uuid << ", "
+          << "global_group_id=" << global_group_id << ", "
+          << "instance_id=" << instance_id << dendl;
+
+  m_instance_watcher->notify_peer_group_removed(instance_id, global_group_id,
+                                                mirror_uuid, on_finish);
+}
+
 template <typename I>
 std::string NamespaceReplayer<I>::get_local_namespace() {
   return m_local_namespace_name;
index 262ffb0cd7950b9af233a107fb4d2610ec9e2dd6..152761ed6dae0ace7c81a74052db15d831888bf7 100644 (file)
@@ -194,7 +194,29 @@ private:
                       const std::string &instance_id,
                       Context* on_finish) override {
       namespace_replayer->handle_remove_image(mirror_uuid, global_image_id,
-                                         instance_id, on_finish);
+                                              instance_id, on_finish);
+    }
+
+    void acquire_group(const std::string &global_group_id,
+                       const std::string &instance_id,
+                       Context* on_finish) override {
+      namespace_replayer->handle_acquire_group(global_group_id, instance_id,
+                                               on_finish);
+    }
+
+    void release_group(const std::string &global_group_id,
+                       const std::string &instance_id,
+                       Context* on_finish) override {
+      namespace_replayer->handle_release_group(global_group_id, instance_id,
+                                               on_finish);
+    }
+
+    void remove_group(const std::string &mirror_uuid,
+                      const std::string &global_group_id,
+                      const std::string &instance_id,
+                      Context* on_finish) override {
+      namespace_replayer->handle_remove_group(mirror_uuid, global_group_id,
+                                              instance_id, on_finish);
     }
   };
 
@@ -268,6 +290,16 @@ private:
                            const std::string &global_image_id,
                            const std::string &instance_id,
                            Context* on_finish);
+  void handle_acquire_group(const std::string &global_group_id,
+                            const std::string &instance_id,
+                            Context* on_finish);
+  void handle_release_group(const std::string &global_group_id,
+                            const std::string &instance_id,
+                            Context* on_finish);
+  void handle_remove_group(const std::string &mirror_uuid,
+                           const std::string &global_group_id,
+                           const std::string &instance_id,
+                           Context* on_finish);
 
   std::string m_local_namespace_name;
   std::string m_remote_namespace_name;
index b16792019e49b9922ed48d41eaf6c95aa3144cb4..0c8bab188fe814eca1adfeddcf06811695e3ae9e 100644 (file)
@@ -382,7 +382,18 @@ void PoolWatcher<I>::handle_group_updated(const std::string &id,
            << "image_count=" << image_count << ", "
            << "enabled=" << enabled << dendl;
 
-  // TODO
+  std::lock_guard locker{m_lock};
+  MirrorEntity entity(MIRROR_ENTITY_TYPE_GROUP, global_group_id, image_count);
+  m_pending_added_entities.erase(entity);
+  m_pending_removed_entities.erase(entity);
+
+  if (enabled) {
+    m_pending_added_entities.insert({entity, id});
+    schedule_listener();
+  } else {
+    m_pending_removed_entities.insert({entity, id});
+    schedule_listener();
+  }
 }
 
 template <typename I>
index b68a4674786b0beb4d6ccd18a07f7e2777e9654d..5ce4c33e9612ccc5ad972804a29e56eda8af92d0 100644 (file)
@@ -41,6 +41,16 @@ struct Listener {
                             const std::string &global_image_id,
                             const std::string &instance_id,
                             Context* on_finish) = 0;
+  virtual void acquire_group(const std::string &global_group_id,
+                             const std::string &instance_id,
+                             Context* on_finish) = 0;
+  virtual void release_group(const std::string &global_group_id,
+                             const std::string &instance_id,
+                             Context* on_finish) = 0;
+  virtual void remove_group(const std::string &mirror_uuid,
+                            const std::string &global_group_id,
+                            const std::string &instance_id,
+                            Context* on_finish) = 0;
 };
 
 struct LookupInfo {
index 4c9f5235f1927fc55c8c0421b65d29737517cfef..25a9f2bb6b9c2b7bda26197e8dbb7a00aa46af54 100644 (file)
@@ -127,6 +127,43 @@ void SyncPayloadBase::dump(Formatter *f) const {
   f->dump_string("sync_id", sync_id);
 }
 
+void GroupPayloadBase::encode(bufferlist &bl) const {
+  using ceph::encode;
+  PayloadBase::encode(bl);
+  encode(global_group_id, bl);
+}
+
+void GroupPayloadBase::decode(__u8 version, bufferlist::const_iterator &iter) {
+  using ceph::decode;
+  PayloadBase::decode(version, iter);
+  decode(global_group_id, iter);
+}
+
+void GroupPayloadBase::dump(Formatter *f) const {
+  PayloadBase::dump(f);
+  f->dump_string("global_group_id", global_group_id);
+}
+
+void PeerGroupRemovedPayload::encode(bufferlist &bl) const {
+  using ceph::encode;
+  PayloadBase::encode(bl);
+  encode(global_group_id, bl);
+  encode(peer_mirror_uuid, bl);
+}
+
+void PeerGroupRemovedPayload::decode(__u8 version, bufferlist::const_iterator &iter) {
+  using ceph::decode;
+  PayloadBase::decode(version, iter);
+  decode(global_group_id, iter);
+  decode(peer_mirror_uuid, iter);
+}
+
+void PeerGroupRemovedPayload::dump(Formatter *f) const {
+  PayloadBase::dump(f);
+  f->dump_string("global_group_id", global_group_id);
+  f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
+}
+
 void UnknownPayload::encode(bufferlist &bl) const {
   ceph_abort();
 }
@@ -166,6 +203,15 @@ void NotifyMessage::decode(bufferlist::const_iterator& iter) {
   case NOTIFY_OP_SYNC_START:
     payload = SyncStartPayload();
     break;
+  case NOTIFY_OP_GROUP_ACQUIRE:
+    payload = GroupAcquirePayload();
+    break;
+  case NOTIFY_OP_GROUP_RELEASE:
+    payload = GroupReleasePayload();
+    break;
+  case NOTIFY_OP_PEER_GROUP_REMOVED:
+    payload = PeerGroupRemovedPayload();
+    break;
   default:
     payload = UnknownPayload();
     break;
@@ -194,6 +240,15 @@ void NotifyMessage::generate_test_instances(std::list<NotifyMessage *> &o) {
 
   o.push_back(new NotifyMessage(SyncStartPayload()));
   o.push_back(new NotifyMessage(SyncStartPayload(1, "sync_id")));
+
+  o.push_back(new NotifyMessage(GroupAcquirePayload()));
+  o.push_back(new NotifyMessage(GroupAcquirePayload(1, "gid")));
+
+  o.push_back(new NotifyMessage(GroupReleasePayload()));
+  o.push_back(new NotifyMessage(GroupReleasePayload(1, "gid")));
+
+  o.push_back(new NotifyMessage(PeerGroupRemovedPayload()));
+  o.push_back(new NotifyMessage(PeerGroupRemovedPayload(1, "gid", "uuid")));
 }
 
 std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
@@ -213,6 +268,15 @@ std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
   case NOTIFY_OP_SYNC_START:
     out << "SyncStart";
     break;
+  case NOTIFY_OP_GROUP_ACQUIRE:
+    out << "GroupAcquire";
+    break;
+  case NOTIFY_OP_GROUP_RELEASE:
+    out << "GroupRelease";
+    break;
+  case NOTIFY_OP_PEER_GROUP_REMOVED:
+    out << "PeerGroupRemoved";
+    break;
   default:
     out << "Unknown (" << static_cast<uint32_t>(op) << ")";
     break;
index b9e0d19bca78c8858bee9d63e60baac111c3165b..619036c750ca1c2378c009c2ad1598e50d8c1309 100644 (file)
@@ -23,7 +23,10 @@ enum NotifyOp {
   NOTIFY_OP_IMAGE_RELEASE      = 1,
   NOTIFY_OP_PEER_IMAGE_REMOVED = 2,
   NOTIFY_OP_SYNC_REQUEST       = 3,
-  NOTIFY_OP_SYNC_START         = 4
+  NOTIFY_OP_SYNC_START         = 4,
+  NOTIFY_OP_GROUP_ACQUIRE      = 5,
+  NOTIFY_OP_GROUP_RELEASE      = 6,
+  NOTIFY_OP_PEER_GROUP_REMOVED = 7,
 };
 
 struct PayloadBase {
@@ -132,6 +135,61 @@ struct SyncStartPayload : public SyncPayloadBase {
   }
 };
 
+struct GroupPayloadBase : public PayloadBase {
+  std::string global_group_id;
+
+  GroupPayloadBase() : PayloadBase() {
+  }
+
+  GroupPayloadBase(uint64_t request_id, const std::string &global_group_id)
+    : PayloadBase(request_id), global_group_id(global_group_id) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::const_iterator &iter);
+  void dump(Formatter *f) const;
+};
+
+struct GroupAcquirePayload : public GroupPayloadBase {
+  static const NotifyOp NOTIFY_OP = NOTIFY_OP_GROUP_ACQUIRE;
+
+  GroupAcquirePayload() {
+  }
+  GroupAcquirePayload(uint64_t request_id, const std::string &global_group_id)
+    : GroupPayloadBase(request_id, global_group_id) {
+  }
+};
+
+struct GroupReleasePayload : public GroupPayloadBase {
+  static const NotifyOp NOTIFY_OP = NOTIFY_OP_GROUP_RELEASE;
+
+  GroupReleasePayload() {
+  }
+  GroupReleasePayload(uint64_t request_id, const std::string &global_group_id)
+    : GroupPayloadBase(request_id, global_group_id) {
+  }
+};
+
+struct PeerGroupRemovedPayload : public PayloadBase {
+  static const NotifyOp NOTIFY_OP = NOTIFY_OP_PEER_GROUP_REMOVED;
+
+  std::string global_group_id;
+  std::string peer_mirror_uuid;
+
+  PeerGroupRemovedPayload() {
+  }
+  PeerGroupRemovedPayload(uint64_t request_id,
+                          const std::string& global_group_id,
+                          const std::string& peer_mirror_uuid)
+    : PayloadBase(request_id),
+      global_group_id(global_group_id), peer_mirror_uuid(peer_mirror_uuid) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::const_iterator &iter);
+  void dump(Formatter *f) const;
+};
+
 struct UnknownPayload {
   static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
 
@@ -148,6 +206,9 @@ typedef std::variant<ImageAcquirePayload,
                     PeerImageRemovedPayload,
                     SyncRequestPayload,
                     SyncStartPayload,
+                    GroupAcquirePayload,
+                    GroupReleasePayload,
+                    PeerGroupRemovedPayload,
                     UnknownPayload> Payload;
 
 struct NotifyMessage {