]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: batch mirror image status updater helper class
authorJason Dillaman <dillaman@redhat.com>
Thu, 5 Sep 2019 02:26:29 +0000 (22:26 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Oct 2019 15:16:46 +0000 (11:16 -0400)
Aggregate up to 100 mirror image status updates into a single
RADOS op.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/librados_test_stub/MockTestMemIoCtxImpl.h
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/test_mock_MirrorStatusUpdater.cc [new file with mode: 0644]
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/MirrorStatusUpdater.cc [new file with mode: 0644]
src/tools/rbd_mirror/MirrorStatusUpdater.h [new file with mode: 0644]

index e9d2bcabe499002ae1f21871412effc5b347ca16..fc86c287fe98cc6109ce2c254927caa1720bbebb 100644 (file)
@@ -44,6 +44,13 @@ public:
     return TestMemIoCtxImpl::aio_notify(o, c, bl, timeout_ms, pbl);
   }
 
+  MOCK_METHOD5(aio_operate, int(const std::string&, TestObjectOperationImpl&,
+                                AioCompletionImpl*, SnapContext*, int));
+  int do_aio_operate(const std::string& o, TestObjectOperationImpl& ops,
+                     AioCompletionImpl* c, SnapContext* snapc, int flags) {
+    return TestMemIoCtxImpl::aio_operate(o, ops, c, snapc, flags);
+  }
+
   MOCK_METHOD4(aio_watch, int(const std::string& o, AioCompletionImpl *c,
                               uint64_t *handle, librados::WatchCtx2 *ctx));
   int do_aio_watch(const std::string& o, AioCompletionImpl *c,
@@ -198,6 +205,7 @@ public:
 
     ON_CALL(*this, clone()).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_clone));
     ON_CALL(*this, aio_notify(_, _, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_notify));
+    ON_CALL(*this, aio_operate(_, _, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_operate));
     ON_CALL(*this, aio_watch(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_watch));
     ON_CALL(*this, aio_unwatch(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_unwatch));
     ON_CALL(*this, assert_exists(_)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_assert_exists));
index 943566ac600da989a4371010ac6e8858bf9fa4b4..3c40aaa87988bf4055f1526f03ab903983a40a9f 100644 (file)
@@ -24,6 +24,7 @@ add_executable(unittest_rbd_mirror
   test_mock_InstanceReplayer.cc
   test_mock_InstanceWatcher.cc
   test_mock_LeaderWatcher.cc
+  test_mock_MirrorStatusUpdater.cc
   test_mock_NamespaceReplayer.cc
   test_mock_PoolReplayer.cc
   test_mock_PoolWatcher.cc
diff --git a/src/test/rbd_mirror/test_mock_MirrorStatusUpdater.cc b/src/test/rbd_mirror/test_mock_MirrorStatusUpdater.cc
new file mode 100644 (file)
index 0000000..1b2886e
--- /dev/null
@@ -0,0 +1,425 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "include/stringify.h"
+#include "tools/rbd_mirror/MirrorStatusUpdater.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/mock/MockContextWQ.h"
+#include "test/rbd_mirror/mock/MockSafeTimer.h"
+#include <map>
+#include <string>
+#include <utility>
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+  MockTestImageCtx(librbd::ImageCtx &image_ctx)
+    : librbd::MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+  MockSafeTimer *timer;
+  ceph::mutex &timer_lock;
+
+  MockContextWQ *work_queue;
+
+  Threads(Threads<librbd::ImageCtx> *threads)
+    : timer(new MockSafeTimer()),
+      timer_lock(threads->timer_lock),
+      work_queue(new MockContextWQ()) {
+  }
+  ~Threads() {
+    delete timer;
+    delete work_queue;
+  }
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#include "tools/rbd_mirror/MirrorStatusUpdater.cc"
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::DoDefault;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::StrEq;
+using ::testing::Return;
+using ::testing::WithArg;
+
+class TestMockMirrorStatusUpdater : public TestMockFixture {
+public:
+  typedef MirrorStatusUpdater<librbd::MockTestImageCtx> MockMirrorStatusUpdater;
+  typedef Threads<librbd::MockTestImageCtx> MockThreads;
+
+  typedef std::map<std::string, cls::rbd::MirrorImageSiteStatus>
+      MirrorImageSiteStatuses;
+
+  void SetUp() override {
+    TestMockFixture::SetUp();
+
+    m_mock_local_io_ctx = &get_mock_io_ctx(m_local_io_ctx);
+    m_mock_threads = new MockThreads(m_threads);
+  }
+
+  void TearDown() override {
+    delete m_mock_threads;
+    TestMockFixture::TearDown();
+  }
+
+  void expect_timer_add_event(Context** timer_event) {
+    EXPECT_CALL(*m_mock_threads->timer, add_event_after(_, _))
+      .WillOnce(WithArg<1>(Invoke([timer_event](Context *ctx) {
+          *timer_event = ctx;
+          return ctx;
+        })));
+  }
+
+  void expect_timer_cancel_event() {
+    EXPECT_CALL(*m_mock_threads->timer, cancel_event(_))
+      .WillOnce(Invoke([](Context* ctx) {
+          delete ctx;
+          return false;
+        }));
+  }
+
+  void expect_work_queue(bool async) {
+    EXPECT_CALL(*m_mock_threads->work_queue, queue(_, _))
+      .WillOnce(Invoke([this, async](Context *ctx, int r) {
+          if (async) {
+            m_threads->work_queue->queue(ctx, r);
+          } else {
+            ctx->complete(r);
+          }
+        }));
+  }
+
+  void expect_mirror_status_update(
+      const std::string& global_image_id,
+      const cls::rbd::MirrorImageSiteStatus& mirror_image_status, int r) {
+    EXPECT_CALL(*m_mock_local_io_ctx,
+                exec(RBD_MIRRORING, _, StrEq("rbd"),
+                     StrEq("mirror_image_status_set"), _, _, _))
+      .WillOnce(WithArg<4>(Invoke(
+        [r, global_image_id, mirror_image_status](bufferlist& in_bl) {
+          auto bl_it = in_bl.cbegin();
+          std::string decode_global_image_id;
+          decode(decode_global_image_id, bl_it);
+          EXPECT_EQ(global_image_id, decode_global_image_id);
+
+          cls::rbd::MirrorImageSiteStatus decode_mirror_image_status;
+          decode(decode_mirror_image_status, bl_it);
+          EXPECT_EQ(mirror_image_status, decode_mirror_image_status);
+          return r;
+        })));
+  }
+
+  void expect_mirror_status_update(
+      const MirrorImageSiteStatuses& mirror_image_site_statuses, int r) {
+    EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _))
+      .WillOnce(Invoke([this](auto&&... args) {
+          int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...);
+          m_mock_local_io_ctx->aio_flush();
+          return r;
+        }));
+
+    for (auto& [global_image_id, mirror_image_status] :
+           mirror_image_site_statuses) {
+      expect_mirror_status_update(global_image_id, mirror_image_status, r);
+      if (r < 0) {
+        break;
+      }
+    }
+  }
+
+  void fire_timer_event(Context** timer_event,
+                        Context** update_task) {
+    expect_timer_add_event(timer_event);
+
+    // timer queues the update task
+    EXPECT_CALL(*m_mock_threads->work_queue, queue(_, _))
+      .WillOnce(WithArg<0>(Invoke([update_task](Context* ctx) mutable {
+          *update_task = ctx;
+        })));
+
+    // fire the timer task
+    {
+      std::lock_guard timer_locker{m_mock_threads->timer_lock};
+      ceph_assert(*timer_event != nullptr);
+      (*timer_event)->complete(0);
+    }
+  }
+
+  void init_mirror_status_updater(
+      MockMirrorStatusUpdater& mock_mirror_status_updater,
+      Context** timer_event) {
+    expect_timer_add_event(timer_event);
+    expect_work_queue(true);
+
+    C_SaferCond ctx;
+    mock_mirror_status_updater.init(&ctx);
+    ASSERT_EQ(0, ctx.wait());
+  }
+
+  void shut_down_mirror_status_updater(
+      MockMirrorStatusUpdater& mock_mirror_status_updater) {
+    expect_timer_cancel_event();
+    expect_work_queue(true);
+
+    C_SaferCond ctx;
+    mock_mirror_status_updater.shut_down(&ctx);
+    ASSERT_EQ(0, ctx.wait());
+  }
+
+  librados::MockTestMemIoCtxImpl* m_mock_local_io_ctx = nullptr;
+  MockThreads* m_mock_threads = nullptr;
+};
+
+TEST_F(TestMockMirrorStatusUpdater, InitShutDown) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, SmallBatch) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  MirrorImageSiteStatuses mirror_image_site_statuses;
+  for (auto i = 0; i < 100; ++i) {
+    auto pair = mirror_image_site_statuses.emplace(
+      stringify(i), cls::rbd::MirrorImageSiteStatus{});
+    mock_mirror_status_updater.set_mirror_image_status(pair.first->first,
+                                                       pair.first->second,
+                                                       false);
+  }
+
+  Context* update_task = nullptr;
+  fire_timer_event(&timer_event, &update_task);
+
+  expect_mirror_status_update(mirror_image_site_statuses, 0);
+  update_task->complete(0);
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, LargeBatch) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  MirrorImageSiteStatuses mirror_image_site_statuses;
+  for (auto i = 0; i < 200; ++i) {
+    auto pair = mirror_image_site_statuses.emplace(
+      stringify(i), cls::rbd::MirrorImageSiteStatus{});
+    mock_mirror_status_updater.set_mirror_image_status(pair.first->first,
+                                                       pair.first->second,
+                                                       false);
+  }
+
+  auto it_1 = mirror_image_site_statuses.begin();
+  auto it_2 = mirror_image_site_statuses.begin();
+  std::advance(it_2, 100);
+  MirrorImageSiteStatuses mirror_image_site_statuses_1{it_1, it_2};
+
+  it_1 = it_2;
+  std::advance(it_2, 100);
+  MirrorImageSiteStatuses mirror_image_site_statuses_2{it_1, it_2};
+
+  Context* update_task = nullptr;
+  fire_timer_event(&timer_event, &update_task);
+
+  expect_mirror_status_update(mirror_image_site_statuses_1, 0);
+  expect_mirror_status_update(mirror_image_site_statuses_2, 0);
+  update_task->complete(0);
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, OverwriteStatus) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  mock_mirror_status_updater.set_mirror_image_status("1", {}, false);
+  mock_mirror_status_updater.set_mirror_image_status(
+    "1", {"", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"},
+    false);
+
+  Context* update_task = nullptr;
+  fire_timer_event(&timer_event, &update_task);
+
+  expect_mirror_status_update(
+    {{"1", cls::rbd::MirrorImageSiteStatus{
+        "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}}, 0);
+  update_task->complete(0);
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, OverwriteStatusInFlight) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  mock_mirror_status_updater.set_mirror_image_status("1", {}, false);
+
+  Context* update_task = nullptr;
+  fire_timer_event(&timer_event, &update_task);
+
+  EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _))
+    .WillOnce(Invoke([this, &mock_mirror_status_updater](auto&&... args) {
+        mock_mirror_status_updater.set_mirror_image_status(
+          "1", {"", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING,
+                "description"},
+          true);
+
+        int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...);
+        m_mock_local_io_ctx->aio_flush();
+        return r;
+      }));
+  expect_mirror_status_update("1", cls::rbd::MirrorImageSiteStatus{}, 0);
+  expect_work_queue(false);
+  expect_mirror_status_update(
+    {{"1", cls::rbd::MirrorImageSiteStatus{
+        "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}}, 0);
+
+  update_task->complete(0);
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, ImmediateUpdate) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  expect_work_queue(false);
+  expect_mirror_status_update({{"1", cls::rbd::MirrorImageSiteStatus{}}}, 0);
+  mock_mirror_status_updater.set_mirror_image_status("1", {}, true);
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, RemoveIdleStatus) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  mock_mirror_status_updater.set_mirror_image_status("1", {}, false);
+
+  C_SaferCond ctx;
+  expect_work_queue(true);
+  mock_mirror_status_updater.remove_mirror_image_status("1", &ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, RemoveInFlightStatus) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  mock_mirror_status_updater.set_mirror_image_status("1", {}, false);
+
+  Context* update_task = nullptr;
+  fire_timer_event(&timer_event, &update_task);
+
+  C_SaferCond on_removed;
+  EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _))
+    .WillOnce(Invoke(
+      [this, &mock_mirror_status_updater, &on_removed](auto&&... args) {
+        mock_mirror_status_updater.remove_mirror_image_status("1", &on_removed);
+
+        int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...);
+        m_mock_local_io_ctx->aio_flush();
+        return r;
+      }));
+  update_task->complete(0);
+  ASSERT_EQ(0, on_removed.wait());
+
+  shut_down_mirror_status_updater(mock_mirror_status_updater);
+}
+
+TEST_F(TestMockMirrorStatusUpdater, ShutDownWhileUpdating) {
+  MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx,
+                                                     m_mock_threads);
+
+  InSequence seq;
+
+  Context* timer_event = nullptr;
+  init_mirror_status_updater(mock_mirror_status_updater, &timer_event);
+
+  mock_mirror_status_updater.set_mirror_image_status("1", {}, false);
+
+  Context* update_task = nullptr;
+  fire_timer_event(&timer_event, &update_task);
+
+  C_SaferCond on_shutdown;
+  EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _))
+    .WillOnce(Invoke(
+      [this, &mock_mirror_status_updater, &on_shutdown](auto&&... args) {
+        mock_mirror_status_updater.shut_down(&on_shutdown);
+
+        int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...);
+        m_mock_local_io_ctx->aio_flush();
+        return r;
+      }));
+
+  expect_timer_cancel_event();
+
+  update_task->complete(0);
+  ASSERT_EQ(0, on_shutdown.wait());
+}
+
+} // namespace mirror
+} // namespace rbd
index c9f08ce814c078a71b4a9b80070940776cb68b36..e5b4fd681e210c30e82bba0fb0aaa88a859eb3ea 100644 (file)
@@ -14,6 +14,7 @@ set(rbd_mirror_internal
   Instances.cc
   LeaderWatcher.cc
   Mirror.cc
+  MirrorStatusUpdater.cc
   MirrorStatusWatcher.cc
   NamespaceReplayer.cc
   PoolReplayer.cc
diff --git a/src/tools/rbd_mirror/MirrorStatusUpdater.cc b/src/tools/rbd_mirror/MirrorStatusUpdater.cc
new file mode 100644 (file)
index 0000000..eda1732
--- /dev/null
@@ -0,0 +1,281 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tools/rbd_mirror/MirrorStatusUpdater.h"
+#include "include/Context.h"
+#include "include/stringify.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "tools/rbd_mirror/Threads.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::MirrorStatusUpdater " << this \
+                           << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+static const double UPDATE_INTERVAL_SECONDS = 30;
+static const uint32_t MAX_UPDATES_PER_OP = 100;
+
+using librbd::util::create_context_callback;
+using librbd::util::create_rados_callback;
+
+template <typename I>
+MirrorStatusUpdater<I>::MirrorStatusUpdater(
+    librados::IoCtx& io_ctx, Threads<I> *threads)
+  : m_io_ctx(io_ctx), m_threads(threads),
+    m_lock(ceph::make_mutex("rbd::mirror::MirrorStatusUpdater " +
+                            stringify(m_io_ctx.get_id()))) {
+  dout(10) << "pool_id=" << m_io_ctx.get_id() << dendl;
+}
+
+template <typename I>
+MirrorStatusUpdater<I>::~MirrorStatusUpdater() {
+  ceph_assert(!m_initialized);
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::init(Context* on_finish) {
+  dout(10) << dendl;
+
+  ceph_assert(!m_initialized);
+  m_initialized = true;
+
+  {
+    std::lock_guard timer_locker{m_threads->timer_lock};
+    schedule_timer_task();
+  }
+
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::shut_down(Context* on_finish) {
+  dout(10) << dendl;
+
+  {
+    std::lock_guard timer_locker{m_threads->timer_lock};
+    ceph_assert(m_timer_task != nullptr);
+    m_threads->timer->cancel_event(m_timer_task);
+  }
+
+  {
+    std::unique_lock locker(m_lock);
+    ceph_assert(m_initialized);
+    m_initialized = false;
+
+    if (m_update_in_progress) {
+      m_update_on_finish_ctxs.push_back(on_finish);
+      return;
+    }
+  }
+
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+bool MirrorStatusUpdater<I>::exists(const std::string& global_image_id) {
+  dout(15) << "global_image_id=" << global_image_id << dendl;
+
+  std::unique_lock locker(m_lock);
+  return (m_global_image_status.count(global_image_id) > 0);
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::set_mirror_image_status(
+    const std::string& global_image_id,
+    const cls::rbd::MirrorImageSiteStatus& mirror_image_site_status,
+    bool immediate_update) {
+  dout(15) << "global_image_id=" << global_image_id << ", "
+           << "mirror_image_site_status=" << mirror_image_site_status << dendl;
+
+  std::unique_lock locker(m_lock);
+
+  m_global_image_status[global_image_id] = mirror_image_site_status;
+  if (immediate_update) {
+    m_update_global_image_ids.insert(global_image_id);
+    queue_update_task(std::move(locker));
+  }
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::remove_mirror_image_status(
+    const std::string& global_image_id, Context* on_finish) {
+  if (try_remove_mirror_image_status(global_image_id, on_finish)) {
+    m_threads->work_queue->queue(on_finish, 0);
+  }
+}
+
+template <typename I>
+bool MirrorStatusUpdater<I>::try_remove_mirror_image_status(
+    const std::string& global_image_id, Context* on_finish) {
+  dout(15) << "global_image_id=" << global_image_id << dendl;
+
+  std::unique_lock locker(m_lock);
+  if ((m_update_in_flight &&
+       m_updating_global_image_ids.count(global_image_id) > 0) ||
+      ((m_update_in_progress || m_update_requested) &&
+       m_update_global_image_ids.count(global_image_id) > 0)) {
+    // if update is scheduled/in-progress, wait for it to complete
+    on_finish = new LambdaContext(
+      [this, global_image_id, on_finish](int r) {
+        if (try_remove_mirror_image_status(global_image_id, on_finish)) {
+          on_finish->complete(0);
+        }
+      });
+    m_update_on_finish_ctxs.push_back(on_finish);
+    return false;
+  }
+
+  m_global_image_status.erase(global_image_id);
+  return true;
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::schedule_timer_task() {
+  dout(10) << dendl;
+
+  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+  ceph_assert(m_timer_task == nullptr);
+  m_timer_task = create_context_callback<
+    MirrorStatusUpdater<I>,
+    &MirrorStatusUpdater<I>::handle_timer_task>(this);
+  m_threads->timer->add_event_after(UPDATE_INTERVAL_SECONDS, m_timer_task);
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::handle_timer_task(int r) {
+  dout(10) << dendl;
+
+  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+  ceph_assert(m_timer_task != nullptr);
+  m_timer_task = nullptr;
+  schedule_timer_task();
+
+  std::unique_lock locker(m_lock);
+  for (auto& pair : m_global_image_status) {
+    m_update_global_image_ids.insert(pair.first);
+  }
+
+  queue_update_task(std::move(locker));
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::queue_update_task(
+  std::unique_lock<ceph::mutex>&& locker) {
+  if (!m_initialized) {
+    return;
+  }
+
+  if (m_update_in_progress) {
+    if (m_update_in_flight) {
+      dout(10) << "deferring update due to in-flight ops" << dendl;
+      m_update_requested = true;
+    }
+    return;
+  }
+
+  m_update_in_progress = true;
+  ceph_assert(!m_update_in_flight);
+  ceph_assert(!m_update_requested);
+  locker.unlock();
+
+  dout(10) << dendl;
+  auto ctx = create_context_callback<
+    MirrorStatusUpdater<I>,
+    &MirrorStatusUpdater<I>::update_task>(this);
+  m_threads->work_queue->queue(ctx);
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::update_task(int r) {
+  dout(10) << dendl;
+
+  std::unique_lock locker(m_lock);
+  ceph_assert(m_update_in_progress);
+  ceph_assert(!m_update_in_flight);
+  m_update_in_flight = true;
+
+  std::swap(m_updating_global_image_ids, m_update_global_image_ids);
+  auto updating_global_image_ids = m_updating_global_image_ids;
+  auto global_image_status = m_global_image_status;
+  locker.unlock();
+
+  Context* ctx = create_context_callback<
+    MirrorStatusUpdater<I>,
+    &MirrorStatusUpdater<I>::handle_update_task>(this);
+  auto gather = new C_Gather(g_ceph_context, ctx);
+
+  auto it = updating_global_image_ids.begin();
+  while (it != updating_global_image_ids.end()) {
+    librados::ObjectWriteOperation op;
+    uint32_t op_count = 0;
+
+    while (it != updating_global_image_ids.end() &&
+           op_count < MAX_UPDATES_PER_OP) {
+      auto& global_image_id = *it;
+      ++it;
+
+      auto status_it = global_image_status.find(global_image_id);
+      if (status_it == global_image_status.end()) {
+        continue;
+      }
+
+      librbd::cls_client::mirror_image_status_set(&op, global_image_id,
+                                                  status_it->second);
+      ++op_count;
+    }
+
+    auto aio_comp = create_rados_callback(gather->new_sub());
+    int r = m_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
+    ceph_assert(r == 0);
+    aio_comp->release();
+  }
+
+  gather->activate();
+}
+
+template <typename I>
+void MirrorStatusUpdater<I>::handle_update_task(int r) {
+  dout(10) << dendl;
+  if (r < 0) {
+    derr << "failed to update mirror image statuses: " << cpp_strerror(r)
+         << dendl;
+  }
+
+  std::unique_lock locker(m_lock);
+
+  Contexts on_finish_ctxs;
+  std::swap(on_finish_ctxs, m_update_on_finish_ctxs);
+
+  ceph_assert(m_update_in_progress);
+  m_update_in_progress = false;
+
+  ceph_assert(m_update_in_flight);
+  m_update_in_flight = false;
+
+  m_updating_global_image_ids.clear();
+
+  if (m_update_requested) {
+    m_update_requested = false;
+    queue_update_task(std::move(locker));
+  } else {
+    locker.unlock();
+  }
+
+  for (auto on_finish : on_finish_ctxs) {
+    on_finish->complete(0);
+  }
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::MirrorStatusUpdater<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/MirrorStatusUpdater.h b/src/tools/rbd_mirror/MirrorStatusUpdater.h
new file mode 100644 (file)
index 0000000..60f4be4
--- /dev/null
@@ -0,0 +1,87 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_MIRROR_STATUS_UPDATER_H
+#define CEPH_RBD_MIRROR_MIRROR_STATUS_UPDATER_H
+
+#include "include/rados/librados.hpp"
+#include "common/ceph_mutex.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+
+struct Context;
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+template <typename> struct Threads;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class MirrorStatusUpdater {
+public:
+
+  static MirrorStatusUpdater* create(librados::IoCtx& io_ctx,
+                                     Threads<ImageCtxT> *threads) {
+    return new MirrorStatusUpdater(io_ctx, threads);
+  }
+
+  MirrorStatusUpdater(librados::IoCtx& io_ctx, Threads<ImageCtxT> *threads);
+  ~MirrorStatusUpdater();
+
+  void init(Context* on_finish);
+  void shut_down(Context* on_finish);
+
+  bool exists(const std::string& global_image_id);
+  void set_mirror_image_status(
+      const std::string& global_image_id,
+      const cls::rbd::MirrorImageSiteStatus& mirror_image_site_status,
+      bool immediate_update);
+  void remove_mirror_image_status(const std::string& global_image_id,
+                                  Context* on_finish);
+
+private:
+  typedef std::list<Context*> Contexts;
+  typedef std::set<std::string> GlobalImageIds;
+  typedef std::map<std::string, cls::rbd::MirrorImageSiteStatus>
+      GlobalImageStatus;
+
+  librados::IoCtx m_io_ctx;
+  Threads<ImageCtxT>* m_threads;
+
+  Context* m_timer_task = nullptr;
+
+  ceph::mutex m_lock;
+
+  bool m_initialized = false;
+
+  GlobalImageIds m_update_global_image_ids;
+  GlobalImageStatus m_global_image_status;
+
+  bool m_update_in_progress = false;
+  bool m_update_in_flight = false;
+  bool m_update_requested = false;
+  Contexts m_update_on_finish_ctxs;
+  GlobalImageIds m_updating_global_image_ids;
+
+  bool try_remove_mirror_image_status(const std::string& global_image_id,
+                                      Context* on_finish);
+
+  void schedule_timer_task();
+  void handle_timer_task(int r);
+
+  void queue_update_task(std::unique_lock<ceph::mutex>&& locker);
+  void update_task(int r);
+  void handle_update_task(int r);
+
+};
+
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::MirrorStatusUpdater<librbd::ImageCtx>;
+
+#endif // CEPH_RBD_MIRROR_MIRROR_STATUS_UPDATER_H