]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror A/A: InstanceReplayer class to acquire and release images
authorMykola Golub <mgolub@mirantis.com>
Fri, 7 Apr 2017 12:14:29 +0000 (14:14 +0200)
committerMykola Golub <mgolub@mirantis.com>
Fri, 7 Apr 2017 13:58:52 +0000 (15:58 +0200)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/test_mock_InstanceReplayer.cc [new file with mode: 0644]
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/InstanceReplayer.cc [new file with mode: 0644]
src/tools/rbd_mirror/InstanceReplayer.h [new file with mode: 0644]
src/tools/rbd_mirror/instance_watcher/Types.cc [new file with mode: 0644]
src/tools/rbd_mirror/instance_watcher/Types.h [new file with mode: 0644]

index 88435ff7828fb9bc92f487e1a2f3aa97e729d89b..291b457bb6070a742509501ce150b27f5bc1c58b 100644 (file)
@@ -19,6 +19,7 @@ add_executable(unittest_rbd_mirror
   test_mock_ImageReplayer.cc
   test_mock_ImageSync.cc
   test_mock_ImageSyncThrottler.cc
+  test_mock_InstanceReplayer.cc
   test_mock_InstanceWatcher.cc
   test_mock_LeaderWatcher.cc
   test_mock_PoolWatcher.cc
diff --git a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc
new file mode 100644 (file)
index 0000000..2fef054
--- /dev/null
@@ -0,0 +1,200 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceReplayer.h"
+#include "tools/rbd_mirror/Threads.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+  MockTestImageCtx(librbd::ImageCtx &image_ctx)
+    : librbd::MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+  Mutex &timer_lock;
+  SafeTimer *timer;
+  ContextWQ *work_queue;
+
+  Threads(Threads<librbd::ImageCtx> *threads)
+    : timer_lock(threads->timer_lock), timer(threads->timer),
+      work_queue(threads->work_queue) {
+  }
+};
+
+template<>
+struct ImageReplayer<librbd::MockTestImageCtx> {
+  static ImageReplayer* s_instance;
+  std::string global_image_id;
+
+  static ImageReplayer *create(
+    Threads<librbd::MockTestImageCtx> *threads,
+    std::shared_ptr<ImageDeleter> image_deleter,
+    ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
+    RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
+    const std::string &global_image_id) {
+    assert(s_instance != nullptr);
+    s_instance->global_image_id = global_image_id;
+    return s_instance;
+  }
+
+  ImageReplayer() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  virtual ~ImageReplayer() {
+    assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(destroy, void());
+  MOCK_METHOD2(start, void(Context *, bool));
+  MOCK_METHOD2(stop, void(Context *, bool));
+  MOCK_METHOD0(restart, void());
+  MOCK_METHOD0(flush, void());
+  MOCK_METHOD2(print_status, void(Formatter *, stringstream *));
+  MOCK_METHOD1(set_remote_images, void(const PeerImages &));
+  MOCK_METHOD2(remove_remote_image, void(const std::string &,
+                                         const std::string &));
+  MOCK_METHOD0(remote_images_empty, bool());
+  MOCK_METHOD0(get_global_image_id, const std::string &());
+  MOCK_METHOD0(get_local_image_id, const std::string &());
+  MOCK_METHOD0(is_running, bool());
+  MOCK_METHOD0(is_stopped, bool());
+  MOCK_METHOD0(is_blacklisted, bool());
+};
+
+template<>
+struct ImageSyncThrottler<librbd::MockTestImageCtx> {
+  ImageSyncThrottler() {
+  }
+  virtual ~ImageSyncThrottler() {
+  }
+};
+
+ImageReplayer<librbd::MockTestImageCtx>* ImageReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace mirror
+} // namespace rbd
+
+// template definitions
+#include "tools/rbd_mirror/InstanceReplayer.cc"
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::ReturnRef;
+
+class TestMockInstanceReplayer : public TestMockFixture {
+public:
+  typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
+  typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
+  typedef Threads<librbd::MockTestImageCtx> MockThreads;
+
+  void SetUp() override {
+    TestMockFixture::SetUp();
+
+    m_mock_threads = new MockThreads(m_threads);
+
+    m_image_deleter.reset(
+      new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer,
+                                    &m_threads->timer_lock));
+    m_image_sync_throttler.reset(
+      new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
+  }
+
+  void TearDown() override {
+    delete m_mock_threads;
+    TestMockFixture::TearDown();
+  }
+
+  MockThreads *m_mock_threads;
+  std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
+  std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>>
+    m_image_sync_throttler;
+};
+
+TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
+  MockImageReplayer mock_image_replayer;
+  MockInstanceReplayer instance_replayer(
+    m_mock_threads, m_image_deleter, m_image_sync_throttler,
+    rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
+    "local_mirror_uuid", m_local_io_ctx.get_id());
+
+  std::string global_image_id("global_image_id");
+  rbd::mirror::instance_watcher::PeerImageIds peer_image_ids =
+    {{"remote_mirror_uuid", "remote_image_id"}};
+
+  EXPECT_CALL(mock_image_replayer, get_global_image_id())
+    .WillRepeatedly(ReturnRef(global_image_id));
+  EXPECT_CALL(mock_image_replayer, is_blacklisted())
+    .WillRepeatedly(Return(false));
+
+  InSequence seq;
+
+  instance_replayer.init();
+  instance_replayer.set_peers({{"remote_mirror_uuid", m_remote_io_ctx}});
+
+  // Acquire
+
+  C_SaferCond on_acquire;
+
+  EXPECT_CALL(mock_image_replayer, set_remote_images(_));
+  EXPECT_CALL(mock_image_replayer, is_stopped())
+    .WillOnce(Return(true));
+  EXPECT_CALL(mock_image_replayer, start(nullptr, false));
+
+  instance_replayer.acquire_image(global_image_id, peer_image_ids, &on_acquire);
+  ASSERT_EQ(0, on_acquire.wait());
+
+  // Release
+
+  C_SaferCond on_release;
+
+  EXPECT_CALL(mock_image_replayer,
+              remove_remote_image("remote_mirror_uuid", "remote_image_id"));
+  EXPECT_CALL(mock_image_replayer, remote_images_empty())
+    .WillOnce(Return(true));
+  EXPECT_CALL(mock_image_replayer, is_stopped())
+    .WillOnce(Return(false));
+  EXPECT_CALL(mock_image_replayer, is_running())
+    .WillOnce(Return(false));
+  EXPECT_CALL(mock_image_replayer, is_stopped())
+    .WillOnce(Return(false));
+  EXPECT_CALL(mock_image_replayer, is_running())
+    .WillOnce(Return(true));
+  EXPECT_CALL(mock_image_replayer, stop(_, false))
+    .WillOnce(CompleteContext(0));
+  EXPECT_CALL(mock_image_replayer, is_stopped())
+    .WillOnce(Return(true));
+  EXPECT_CALL(mock_image_replayer, destroy());
+
+  instance_replayer.release_image("global_image_id", peer_image_ids, false,
+                                  &on_release);
+  ASSERT_EQ(0, on_release.wait());
+
+  instance_replayer.shut_down();
+}
+
+} // namespace mirror
+} // namespace rbd
index 200fdee63e8261212f12f004c4edb78375fd6c04..4f1acbcbd4f2e2732f8792fbbfd1919401e2180e 100644 (file)
@@ -1,4 +1,5 @@
 add_library(rbd_mirror_types STATIC
+  instance_watcher/Types.cc
   leader_watcher/Types.cc)
 
 set(rbd_mirror_internal
@@ -7,6 +8,7 @@ set(rbd_mirror_internal
   ImageReplayer.cc
   ImageSync.cc
   ImageSyncThrottler.cc
+  InstanceReplayer.cc
   InstanceWatcher.cc
   Instances.cc
   LeaderWatcher.cc
diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc
new file mode 100644 (file)
index 0000000..b1dd697
--- /dev/null
@@ -0,0 +1,496 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/stringify.h"
+#include "common/Timer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+#include "ImageReplayer.h"
+#include "InstanceReplayer.h"
+#include "Threads.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
+                           << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+InstanceReplayer<I>::InstanceReplayer(
+    Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter,
+    ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados,
+    const std::string &local_mirror_uuid, int64_t local_pool_id)
+    : m_threads(threads), m_image_deleter(image_deleter),
+      m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados),
+      m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
+      m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+}
+
+template <typename I>
+InstanceReplayer<I>::~InstanceReplayer() {
+  assert(m_image_state_check_task == nullptr);
+  assert(m_async_op_tracker.empty());
+  assert(m_image_replayers.empty());
+}
+
+template <typename I>
+int InstanceReplayer<I>::init() {
+  C_SaferCond init_ctx;
+  init(&init_ctx);
+  return init_ctx.wait();
+}
+
+template <typename I>
+void InstanceReplayer<I>::init(Context *on_finish) {
+  dout(20) << dendl;
+
+  Context *ctx = new FunctionContext(
+    [this, on_finish] (int r) {
+      {
+        Mutex::Locker timer_locker(m_threads->timer_lock);
+        schedule_image_state_check_task();
+      }
+      on_finish->complete(0);
+    });
+
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::shut_down() {
+  C_SaferCond shut_down_ctx;
+  shut_down(&shut_down_ctx);
+  int r = shut_down_ctx.wait();
+  assert(r == 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::shut_down(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_shut_down == nullptr);
+  m_on_shut_down = on_finish;
+
+  Context *ctx = new FunctionContext(
+    [this] (int r) {
+      cancel_image_state_check_task();
+      wait_for_ops();
+    });
+
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::set_peers(const Peers &peers) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  m_peers = peers;
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_all(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
+  for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
+       it = m_image_replayers.erase(it)) {
+    auto image_replayer = it->second;
+    auto ctx = gather_ctx->new_sub();
+    ctx = new FunctionContext(
+      [image_replayer, ctx] (int r) {
+        image_replayer->destroy();
+        ctx->complete(0);
+      });
+    stop_image_replayer(image_replayer, ctx);
+  }
+  gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::acquire_image(
+    const std::string &global_image_id,
+    const instance_watcher::PeerImageIds &peers, Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_shut_down == nullptr);
+
+  auto it = m_image_replayers.find(global_image_id);
+
+  if (it == m_image_replayers.end()) {
+    auto image_replayer = ImageReplayer<I>::create(
+      m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
+      m_local_mirror_uuid, m_local_pool_id, global_image_id);
+
+    dout(20) << global_image_id << ": creating replayer " << image_replayer
+             << dendl;
+
+    it = m_image_replayers.insert(std::make_pair(global_image_id,
+                                                 image_replayer)).first;
+  }
+
+  auto image_replayer = it->second;
+
+  PeerImages remote_images;
+  for (auto &peer : peers) {
+    auto it = m_peers.find(Peer(peer.mirror_uuid));
+    assert(it != m_peers.end());
+    auto io_ctx = it->io_ctx;
+    remote_images.insert({peer.mirror_uuid, io_ctx, peer.image_id});
+  }
+
+  image_replayer->set_remote_images(remote_images);
+
+  start_image_replayer(image_replayer);
+
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_image(
+    const std::string &global_image_id,
+    const instance_watcher::PeerImageIds &peers, bool schedule_delete,
+    Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << ", "
+           << "schedule_delete=" << schedule_delete << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_shut_down == nullptr);
+
+  auto it = m_image_replayers.find(global_image_id);
+
+  if (it == m_image_replayers.end()) {
+    dout(20) << global_image_id << ": not found" << dendl;
+    m_threads->work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  auto image_replayer = it->second;
+
+  for (auto &peer : peers) {
+    image_replayer->remove_remote_image(peer.mirror_uuid, peer.image_id);
+  }
+
+  if (!image_replayer->remote_images_empty()) {
+    dout(20) << global_image_id << ": still has remote images" << dendl;
+    m_threads->work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  m_image_replayers.erase(it);
+
+  on_finish = new FunctionContext(
+    [image_replayer, on_finish] (int r) {
+      image_replayer->destroy();
+      on_finish->complete(0);
+    });
+
+  if (schedule_delete) {
+    on_finish = new FunctionContext(
+      [this, image_replayer, on_finish] (int r) {
+        auto global_image_id = image_replayer->get_global_image_id();
+        auto local_image_id = image_replayer->get_local_image_id();
+        if (local_image_id.empty()) {
+          dout(20) << global_image_id << ": unknown local_image_id"
+                   << " (image does not exist or primary), skipping delete"
+                   << dendl;
+        } else {
+          m_image_deleter->schedule_image_delete(
+            m_local_rados, m_local_pool_id, local_image_id, global_image_id);
+        }
+        on_finish->complete(0);
+      });
+  }
+
+  stop_image_replayer(image_replayer, on_finish);
+}
+
+template <typename I>
+void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
+  dout(20) << dendl;
+
+  if (!f) {
+    return;
+  }
+
+  Mutex::Locker locker(m_lock);
+
+  f->open_array_section("image_replayers");
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->print_status(f, ss);
+  }
+  f->close_section();
+}
+
+template <typename I>
+void InstanceReplayer<I>::start()
+{
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  m_manual_stop = false;
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->start(nullptr, true);
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop()
+{
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  m_manual_stop = true;
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->stop(nullptr, true);
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::restart()
+{
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  m_manual_stop = false;
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->restart();
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::flush()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->flush();
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_image_replayer(
+    ImageReplayer<I> *image_replayer) {
+  assert(m_lock.is_locked());
+
+  std::string global_image_id = image_replayer->get_global_image_id();
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
+  if (!image_replayer->is_stopped()) {
+    return;
+  } else if (image_replayer->is_blacklisted()) {
+    derr << "blacklisted detected during image replay" << dendl;
+    return;
+  }
+
+  FunctionContext *ctx = new FunctionContext(
+    [this, global_image_id] (int r) {
+      dout(20) << "image deleter result: r=" << r << ", "
+               << "global_image_id=" << global_image_id << dendl;
+
+      Mutex::Locker locker(m_lock);
+      m_async_op_tracker.finish_op();
+
+      if (r == -ESTALE || r == -ECANCELED) {
+        return;
+      }
+
+      auto it = m_image_replayers.find(global_image_id);
+      if (it == m_image_replayers.end()) {
+        return;
+      }
+
+      auto image_replayer = it->second;
+      if (r >= 0) {
+        image_replayer->start(nullptr, false);
+      } else {
+        start_image_replayer(image_replayer);
+      }
+    });
+
+  m_async_op_tracker.start_op();
+  m_image_deleter->wait_for_scheduled_deletion(
+    m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_image_replayers() {
+  dout(20) << dendl;
+
+  Context *ctx = new FunctionContext(
+    [this] (int r) {
+      Mutex::Locker locker(m_lock);
+      m_async_op_tracker.finish_op();
+      if (m_on_shut_down != nullptr) {
+        return;
+      }
+      for (auto &it : m_image_replayers) {
+        start_image_replayer(it.second);
+      }
+    });
+
+  m_async_op_tracker.start_op();
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+
+template <typename I>
+void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
+                                              Context *on_finish) {
+  dout(20) << image_replayer << " global_image_id="
+           << image_replayer->get_global_image_id() << ", on_finish="
+           << on_finish << dendl;
+
+  if (image_replayer->is_stopped()) {
+    m_threads->work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  m_async_op_tracker.start_op();
+  Context *ctx = create_async_context_callback(
+    m_threads->work_queue, new FunctionContext(
+      [this, image_replayer, on_finish] (int r) {
+        stop_image_replayer(image_replayer, on_finish);
+        m_async_op_tracker.finish_op();
+      }));
+
+  if (image_replayer->is_running()) {
+    image_replayer->stop(ctx, false);
+  } else {
+    int after = 1;
+    dout(20) << "scheduling image replayer " << image_replayer << " stop after "
+             << after << " sec (task " << ctx << ")" << dendl;
+    ctx = new FunctionContext(
+      [this, after, ctx] (int r) {
+        Mutex::Locker timer_locker(m_threads->timer_lock);
+        m_threads->timer->add_event_after(after, ctx);
+      });
+    m_threads->work_queue->queue(ctx, 0);
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::wait_for_ops() {
+  dout(20) << dendl;
+
+  Context *ctx = create_context_callback<
+    InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
+
+  m_async_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_wait_for_ops(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  assert(r == 0);
+
+  Mutex::Locker locker(m_lock);
+  stop_image_replayers();
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_image_replayers() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
+    &InstanceReplayer<I>::handle_stop_image_replayers>(this));
+
+  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+  for (auto &it : m_image_replayers) {
+    stop_image_replayer(it.second, gather_ctx->new_sub());
+  }
+  gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  assert(r == 0);
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    for (auto &it : m_image_replayers) {
+      assert(it.second->is_stopped());
+      it.second->destroy();
+    }
+    m_image_replayers.clear();
+
+    assert(m_on_shut_down != nullptr);
+    std::swap(on_finish, m_on_shut_down);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceReplayer<I>::cancel_image_state_check_task() {
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+
+  if (m_image_state_check_task == nullptr) {
+    return;
+  }
+
+  dout(20) << m_image_state_check_task << dendl;
+  bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
+  assert(canceled);
+  m_image_state_check_task = nullptr;
+}
+
+template <typename I>
+void InstanceReplayer<I>::schedule_image_state_check_task() {
+  assert(m_threads->timer_lock.is_locked());
+  assert(m_image_state_check_task == nullptr);
+
+  m_image_state_check_task = new FunctionContext(
+    [this](int r) {
+      assert(m_threads->timer_lock.is_locked());
+      m_image_state_check_task = nullptr;
+      schedule_image_state_check_task();
+      start_image_replayers();
+    });
+
+  int after =
+    max(1, g_ceph_context->_conf->rbd_mirror_image_state_check_interval);
+
+  dout(20) << "scheduling image state check after " << after << " sec (task "
+           << m_image_state_check_task << ")" << dendl;
+  m_threads->timer->add_event_after(after, m_image_state_check_task);
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/InstanceReplayer.h b/src/tools/rbd_mirror/InstanceReplayer.h
new file mode 100644 (file)
index 0000000..5d521ec
--- /dev/null
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_INSTANCE_REPLAYER_H
+#define RBD_MIRROR_INSTANCE_REPLAYER_H
+
+#include <map>
+#include <sstream>
+
+#include "common/AsyncOpTracker.h"
+#include "common/Formatter.h"
+#include "common/Mutex.h"
+#include "tools/rbd_mirror/instance_watcher/Types.h"
+#include "types.h"
+
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+class ImageDeleter;
+
+template <typename> class ImageReplayer;
+template <typename> struct Threads;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class InstanceReplayer {
+public:
+  static InstanceReplayer* create(
+      Threads<ImageCtxT> *threads, std::shared_ptr<ImageDeleter> image_deleter,
+      ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler, RadosRef local_rados,
+      const std::string &local_mirror_uuid, int64_t local_pool_id) {
+      return new InstanceReplayer(threads, image_deleter, image_sync_throttler,
+                                 local_rados, local_mirror_uuid, local_pool_id);
+  }
+  void destroy() {
+    delete this;
+  }
+
+  InstanceReplayer(Threads<ImageCtxT> *threads,
+                  std::shared_ptr<ImageDeleter> image_deleter,
+                   ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+                  RadosRef local_rados, const std::string &local_mirror_uuid,
+                  int64_t local_pool_id);
+  ~InstanceReplayer();
+
+  int init();
+  void shut_down();
+
+  void init(Context *on_finish);
+  void shut_down(Context *on_finish);
+
+  void set_peers(const Peers &peers);
+
+  void acquire_image(const std::string &global_image_id,
+                     const instance_watcher::PeerImageIds &peers,
+                     Context *on_finish);
+  void release_image(const std::string &global_image_id,
+                     const instance_watcher::PeerImageIds &peers,
+                     bool schedule_delete, Context *on_finish);
+  void release_all(Context *on_finish);
+
+  void print_status(Formatter *f, stringstream *ss);
+  void start();
+  void stop();
+  void restart();
+  void flush();
+
+private:
+  /**
+   * @verbatim
+   *
+   * <uninitialized> <-------------------\
+   *    | (init)                         |                    (repeat for each
+   *    v                             STOP_IMAGE_REPLAYER ---\ image replayer)
+   * SCHEDULE_IMAGE_STATE_CHECK_TASK     ^         ^         |
+   *    |                                |         |         |
+   *    v          (shut_down)           |         \---------/
+   * <initialized> -----------------> WAIT_FOR_OPS
+   *
+   * @endverbatim
+   */
+
+  Threads<ImageCtxT> *m_threads;
+  std::shared_ptr<ImageDeleter> m_image_deleter;
+  ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
+  RadosRef m_local_rados;
+  std::string m_local_mirror_uuid;
+  int64_t m_local_pool_id;
+
+  Mutex m_lock;
+  AsyncOpTracker m_async_op_tracker;
+  std::map<std::string, ImageReplayer<ImageCtxT> *> m_image_replayers;
+  Peers m_peers;
+  Context *m_image_state_check_task = nullptr;
+  Context *m_on_shut_down = nullptr;
+  bool m_manual_stop = false;
+
+  void wait_for_ops();
+  void handle_wait_for_ops(int r);
+
+  void start_image_replayer(ImageReplayer<ImageCtxT> *image_replayer);
+  void start_image_replayers();
+
+  void stop_image_replayer(ImageReplayer<ImageCtxT> *image_replayer,
+                           Context *on_finish);
+
+  void stop_image_replayers();
+  void handle_stop_image_replayers(int r);
+
+  void schedule_image_state_check_task();
+  void cancel_image_state_check_task();
+};
+
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_INSTANCE_REPLAYER_H
diff --git a/src/tools/rbd_mirror/instance_watcher/Types.cc b/src/tools/rbd_mirror/instance_watcher/Types.cc
new file mode 100644 (file)
index 0000000..1fe5316
--- /dev/null
@@ -0,0 +1,28 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Types.h"
+#include "common/Formatter.h"
+
+namespace rbd {
+namespace mirror {
+namespace instance_watcher {
+
+void PeerImageId::encode(bufferlist &bl) const {
+  ::encode(mirror_uuid, bl);
+  ::encode(image_id, bl);
+}
+
+void PeerImageId::decode(bufferlist::iterator &iter) {
+  ::decode(mirror_uuid, iter);
+  ::decode(image_id, iter);
+}
+
+void PeerImageId::dump(Formatter *f) const {
+  f->dump_string("mirror_uuid", mirror_uuid);
+  f->dump_string("image_id", image_id);
+}
+
+} // namespace instance_watcher
+} // namespace mirror
+} // namespace rbd
diff --git a/src/tools/rbd_mirror/instance_watcher/Types.h b/src/tools/rbd_mirror/instance_watcher/Types.h
new file mode 100644 (file)
index 0000000..f4e08e3
--- /dev/null
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
+#define RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
+
+#include <string>
+#include <set>
+#include <boost/variant.hpp>
+
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include "include/int_types.h"
+
+namespace ceph { class Formatter; }
+
+namespace rbd {
+namespace mirror {
+namespace instance_watcher {
+
+struct PeerImageId {
+  std::string mirror_uuid;
+  std::string image_id;
+
+  inline bool operator<(const PeerImageId &rhs) const {
+    return mirror_uuid < rhs.mirror_uuid;
+  }
+
+  inline bool operator==(const PeerImageId &rhs) const {
+    return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
+  }
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& it);
+  void dump(Formatter *f) const;
+};
+
+WRITE_CLASS_ENCODER(PeerImageId);
+
+typedef std::set<PeerImageId> PeerImageIds;
+
+} // namespace instance_watcher
+} // namespace mirror
+} // namespace librbd
+
+#endif // RBD_MIRROR_INSTANCE_WATCHER_TYPES_H