From: Jason Dillaman Date: Thu, 5 Sep 2019 02:26:29 +0000 (-0400) Subject: rbd-mirror: batch mirror image status updater helper class X-Git-Tag: v15.1.0~1245^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3013979563a7461b7612419ec7bf74e516fcfeb5;p=ceph.git rbd-mirror: batch mirror image status updater helper class Aggregate up to 100 mirror image status updates into a single RADOS op. Signed-off-by: Jason Dillaman --- diff --git a/src/test/librados_test_stub/MockTestMemIoCtxImpl.h b/src/test/librados_test_stub/MockTestMemIoCtxImpl.h index e9d2bcabe499..fc86c287fe98 100644 --- a/src/test/librados_test_stub/MockTestMemIoCtxImpl.h +++ b/src/test/librados_test_stub/MockTestMemIoCtxImpl.h @@ -44,6 +44,13 @@ public: return TestMemIoCtxImpl::aio_notify(o, c, bl, timeout_ms, pbl); } + MOCK_METHOD5(aio_operate, int(const std::string&, TestObjectOperationImpl&, + AioCompletionImpl*, SnapContext*, int)); + int do_aio_operate(const std::string& o, TestObjectOperationImpl& ops, + AioCompletionImpl* c, SnapContext* snapc, int flags) { + return TestMemIoCtxImpl::aio_operate(o, ops, c, snapc, flags); + } + MOCK_METHOD4(aio_watch, int(const std::string& o, AioCompletionImpl *c, uint64_t *handle, librados::WatchCtx2 *ctx)); int do_aio_watch(const std::string& o, AioCompletionImpl *c, @@ -198,6 +205,7 @@ public: ON_CALL(*this, clone()).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_clone)); ON_CALL(*this, aio_notify(_, _, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_notify)); + ON_CALL(*this, aio_operate(_, _, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_operate)); ON_CALL(*this, aio_watch(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_watch)); ON_CALL(*this, aio_unwatch(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_unwatch)); ON_CALL(*this, assert_exists(_)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_assert_exists)); diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index 943566ac600d..3c40aaa87988 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -24,6 +24,7 @@ add_executable(unittest_rbd_mirror test_mock_InstanceReplayer.cc test_mock_InstanceWatcher.cc test_mock_LeaderWatcher.cc + test_mock_MirrorStatusUpdater.cc test_mock_NamespaceReplayer.cc test_mock_PoolReplayer.cc test_mock_PoolWatcher.cc diff --git a/src/test/rbd_mirror/test_mock_MirrorStatusUpdater.cc b/src/test/rbd_mirror/test_mock_MirrorStatusUpdater.cc new file mode 100644 index 000000000000..1b2886e1e720 --- /dev/null +++ b/src/test/rbd_mirror/test_mock_MirrorStatusUpdater.cc @@ -0,0 +1,425 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/rbd_mirror/test_mock_fixture.h" +#include "include/stringify.h" +#include "tools/rbd_mirror/MirrorStatusUpdater.h" +#include "tools/rbd_mirror/Threads.h" +#include "test/librados_test_stub/MockTestMemIoCtxImpl.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/mock/MockContextWQ.h" +#include "test/rbd_mirror/mock/MockSafeTimer.h" +#include +#include +#include + +namespace librbd { +namespace { + +struct MockTestImageCtx : public MockImageCtx { + MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace +} // namespace librbd + +namespace rbd { +namespace mirror { + +template <> +struct Threads { + MockSafeTimer *timer; + ceph::mutex &timer_lock; + + MockContextWQ *work_queue; + + Threads(Threads *threads) + : timer(new MockSafeTimer()), + timer_lock(threads->timer_lock), + work_queue(new MockContextWQ()) { + } + ~Threads() { + delete timer; + delete work_queue; + } +}; + +} // namespace mirror +} // namespace rbd + +#include "tools/rbd_mirror/MirrorStatusUpdater.cc" + +namespace rbd { +namespace mirror { + +using ::testing::_; +using ::testing::DoDefault; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::StrEq; +using ::testing::Return; +using ::testing::WithArg; + +class TestMockMirrorStatusUpdater : public TestMockFixture { +public: + typedef MirrorStatusUpdater MockMirrorStatusUpdater; + typedef Threads MockThreads; + + typedef std::map + MirrorImageSiteStatuses; + + void SetUp() override { + TestMockFixture::SetUp(); + + m_mock_local_io_ctx = &get_mock_io_ctx(m_local_io_ctx); + m_mock_threads = new MockThreads(m_threads); + } + + void TearDown() override { + delete m_mock_threads; + TestMockFixture::TearDown(); + } + + void expect_timer_add_event(Context** timer_event) { + EXPECT_CALL(*m_mock_threads->timer, add_event_after(_, _)) + .WillOnce(WithArg<1>(Invoke([timer_event](Context *ctx) { + *timer_event = ctx; + return ctx; + }))); + } + + void expect_timer_cancel_event() { + EXPECT_CALL(*m_mock_threads->timer, cancel_event(_)) + .WillOnce(Invoke([](Context* ctx) { + delete ctx; + return false; + })); + } + + void expect_work_queue(bool async) { + EXPECT_CALL(*m_mock_threads->work_queue, queue(_, _)) + .WillOnce(Invoke([this, async](Context *ctx, int r) { + if (async) { + m_threads->work_queue->queue(ctx, r); + } else { + ctx->complete(r); + } + })); + } + + void expect_mirror_status_update( + const std::string& global_image_id, + const cls::rbd::MirrorImageSiteStatus& mirror_image_status, int r) { + EXPECT_CALL(*m_mock_local_io_ctx, + exec(RBD_MIRRORING, _, StrEq("rbd"), + StrEq("mirror_image_status_set"), _, _, _)) + .WillOnce(WithArg<4>(Invoke( + [r, global_image_id, mirror_image_status](bufferlist& in_bl) { + auto bl_it = in_bl.cbegin(); + std::string decode_global_image_id; + decode(decode_global_image_id, bl_it); + EXPECT_EQ(global_image_id, decode_global_image_id); + + cls::rbd::MirrorImageSiteStatus decode_mirror_image_status; + decode(decode_mirror_image_status, bl_it); + EXPECT_EQ(mirror_image_status, decode_mirror_image_status); + return r; + }))); + } + + void expect_mirror_status_update( + const MirrorImageSiteStatuses& mirror_image_site_statuses, int r) { + EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _)) + .WillOnce(Invoke([this](auto&&... args) { + int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...); + m_mock_local_io_ctx->aio_flush(); + return r; + })); + + for (auto& [global_image_id, mirror_image_status] : + mirror_image_site_statuses) { + expect_mirror_status_update(global_image_id, mirror_image_status, r); + if (r < 0) { + break; + } + } + } + + void fire_timer_event(Context** timer_event, + Context** update_task) { + expect_timer_add_event(timer_event); + + // timer queues the update task + EXPECT_CALL(*m_mock_threads->work_queue, queue(_, _)) + .WillOnce(WithArg<0>(Invoke([update_task](Context* ctx) mutable { + *update_task = ctx; + }))); + + // fire the timer task + { + std::lock_guard timer_locker{m_mock_threads->timer_lock}; + ceph_assert(*timer_event != nullptr); + (*timer_event)->complete(0); + } + } + + void init_mirror_status_updater( + MockMirrorStatusUpdater& mock_mirror_status_updater, + Context** timer_event) { + expect_timer_add_event(timer_event); + expect_work_queue(true); + + C_SaferCond ctx; + mock_mirror_status_updater.init(&ctx); + ASSERT_EQ(0, ctx.wait()); + } + + void shut_down_mirror_status_updater( + MockMirrorStatusUpdater& mock_mirror_status_updater) { + expect_timer_cancel_event(); + expect_work_queue(true); + + C_SaferCond ctx; + mock_mirror_status_updater.shut_down(&ctx); + ASSERT_EQ(0, ctx.wait()); + } + + librados::MockTestMemIoCtxImpl* m_mock_local_io_ctx = nullptr; + MockThreads* m_mock_threads = nullptr; +}; + +TEST_F(TestMockMirrorStatusUpdater, InitShutDown) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, SmallBatch) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + MirrorImageSiteStatuses mirror_image_site_statuses; + for (auto i = 0; i < 100; ++i) { + auto pair = mirror_image_site_statuses.emplace( + stringify(i), cls::rbd::MirrorImageSiteStatus{}); + mock_mirror_status_updater.set_mirror_image_status(pair.first->first, + pair.first->second, + false); + } + + Context* update_task = nullptr; + fire_timer_event(&timer_event, &update_task); + + expect_mirror_status_update(mirror_image_site_statuses, 0); + update_task->complete(0); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, LargeBatch) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + MirrorImageSiteStatuses mirror_image_site_statuses; + for (auto i = 0; i < 200; ++i) { + auto pair = mirror_image_site_statuses.emplace( + stringify(i), cls::rbd::MirrorImageSiteStatus{}); + mock_mirror_status_updater.set_mirror_image_status(pair.first->first, + pair.first->second, + false); + } + + auto it_1 = mirror_image_site_statuses.begin(); + auto it_2 = mirror_image_site_statuses.begin(); + std::advance(it_2, 100); + MirrorImageSiteStatuses mirror_image_site_statuses_1{it_1, it_2}; + + it_1 = it_2; + std::advance(it_2, 100); + MirrorImageSiteStatuses mirror_image_site_statuses_2{it_1, it_2}; + + Context* update_task = nullptr; + fire_timer_event(&timer_event, &update_task); + + expect_mirror_status_update(mirror_image_site_statuses_1, 0); + expect_mirror_status_update(mirror_image_site_statuses_2, 0); + update_task->complete(0); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, OverwriteStatus) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + mock_mirror_status_updater.set_mirror_image_status("1", {}, false); + mock_mirror_status_updater.set_mirror_image_status( + "1", {"", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}, + false); + + Context* update_task = nullptr; + fire_timer_event(&timer_event, &update_task); + + expect_mirror_status_update( + {{"1", cls::rbd::MirrorImageSiteStatus{ + "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}}, 0); + update_task->complete(0); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, OverwriteStatusInFlight) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + mock_mirror_status_updater.set_mirror_image_status("1", {}, false); + + Context* update_task = nullptr; + fire_timer_event(&timer_event, &update_task); + + EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _)) + .WillOnce(Invoke([this, &mock_mirror_status_updater](auto&&... args) { + mock_mirror_status_updater.set_mirror_image_status( + "1", {"", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, + "description"}, + true); + + int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...); + m_mock_local_io_ctx->aio_flush(); + return r; + })); + expect_mirror_status_update("1", cls::rbd::MirrorImageSiteStatus{}, 0); + expect_work_queue(false); + expect_mirror_status_update( + {{"1", cls::rbd::MirrorImageSiteStatus{ + "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "description"}}}, 0); + + update_task->complete(0); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, ImmediateUpdate) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + expect_work_queue(false); + expect_mirror_status_update({{"1", cls::rbd::MirrorImageSiteStatus{}}}, 0); + mock_mirror_status_updater.set_mirror_image_status("1", {}, true); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, RemoveIdleStatus) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + mock_mirror_status_updater.set_mirror_image_status("1", {}, false); + + C_SaferCond ctx; + expect_work_queue(true); + mock_mirror_status_updater.remove_mirror_image_status("1", &ctx); + ASSERT_EQ(0, ctx.wait()); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, RemoveInFlightStatus) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + mock_mirror_status_updater.set_mirror_image_status("1", {}, false); + + Context* update_task = nullptr; + fire_timer_event(&timer_event, &update_task); + + C_SaferCond on_removed; + EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _)) + .WillOnce(Invoke( + [this, &mock_mirror_status_updater, &on_removed](auto&&... args) { + mock_mirror_status_updater.remove_mirror_image_status("1", &on_removed); + + int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...); + m_mock_local_io_ctx->aio_flush(); + return r; + })); + update_task->complete(0); + ASSERT_EQ(0, on_removed.wait()); + + shut_down_mirror_status_updater(mock_mirror_status_updater); +} + +TEST_F(TestMockMirrorStatusUpdater, ShutDownWhileUpdating) { + MockMirrorStatusUpdater mock_mirror_status_updater(m_local_io_ctx, + m_mock_threads); + + InSequence seq; + + Context* timer_event = nullptr; + init_mirror_status_updater(mock_mirror_status_updater, &timer_event); + + mock_mirror_status_updater.set_mirror_image_status("1", {}, false); + + Context* update_task = nullptr; + fire_timer_event(&timer_event, &update_task); + + C_SaferCond on_shutdown; + EXPECT_CALL(*m_mock_local_io_ctx, aio_operate(_, _, _, _, _)) + .WillOnce(Invoke( + [this, &mock_mirror_status_updater, &on_shutdown](auto&&... args) { + mock_mirror_status_updater.shut_down(&on_shutdown); + + int r = m_mock_local_io_ctx->do_aio_operate(decltype(args)(args)...); + m_mock_local_io_ctx->aio_flush(); + return r; + })); + + expect_timer_cancel_event(); + + update_task->complete(0); + ASSERT_EQ(0, on_shutdown.wait()); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index c9f08ce814c0..e5b4fd681e21 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -14,6 +14,7 @@ set(rbd_mirror_internal Instances.cc LeaderWatcher.cc Mirror.cc + MirrorStatusUpdater.cc MirrorStatusWatcher.cc NamespaceReplayer.cc PoolReplayer.cc diff --git a/src/tools/rbd_mirror/MirrorStatusUpdater.cc b/src/tools/rbd_mirror/MirrorStatusUpdater.cc new file mode 100644 index 000000000000..eda173299242 --- /dev/null +++ b/src/tools/rbd_mirror/MirrorStatusUpdater.cc @@ -0,0 +1,281 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "tools/rbd_mirror/MirrorStatusUpdater.h" +#include "include/Context.h" +#include "include/stringify.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "tools/rbd_mirror/Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::MirrorStatusUpdater " << this \ + << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +static const double UPDATE_INTERVAL_SECONDS = 30; +static const uint32_t MAX_UPDATES_PER_OP = 100; + +using librbd::util::create_context_callback; +using librbd::util::create_rados_callback; + +template +MirrorStatusUpdater::MirrorStatusUpdater( + librados::IoCtx& io_ctx, Threads *threads) + : m_io_ctx(io_ctx), m_threads(threads), + m_lock(ceph::make_mutex("rbd::mirror::MirrorStatusUpdater " + + stringify(m_io_ctx.get_id()))) { + dout(10) << "pool_id=" << m_io_ctx.get_id() << dendl; +} + +template +MirrorStatusUpdater::~MirrorStatusUpdater() { + ceph_assert(!m_initialized); +} + +template +void MirrorStatusUpdater::init(Context* on_finish) { + dout(10) << dendl; + + ceph_assert(!m_initialized); + m_initialized = true; + + { + std::lock_guard timer_locker{m_threads->timer_lock}; + schedule_timer_task(); + } + + m_threads->work_queue->queue(on_finish, 0); +} + +template +void MirrorStatusUpdater::shut_down(Context* on_finish) { + dout(10) << dendl; + + { + std::lock_guard timer_locker{m_threads->timer_lock}; + ceph_assert(m_timer_task != nullptr); + m_threads->timer->cancel_event(m_timer_task); + } + + { + std::unique_lock locker(m_lock); + ceph_assert(m_initialized); + m_initialized = false; + + if (m_update_in_progress) { + m_update_on_finish_ctxs.push_back(on_finish); + return; + } + } + + m_threads->work_queue->queue(on_finish, 0); +} + +template +bool MirrorStatusUpdater::exists(const std::string& global_image_id) { + dout(15) << "global_image_id=" << global_image_id << dendl; + + std::unique_lock locker(m_lock); + return (m_global_image_status.count(global_image_id) > 0); +} + +template +void MirrorStatusUpdater::set_mirror_image_status( + const std::string& global_image_id, + const cls::rbd::MirrorImageSiteStatus& mirror_image_site_status, + bool immediate_update) { + dout(15) << "global_image_id=" << global_image_id << ", " + << "mirror_image_site_status=" << mirror_image_site_status << dendl; + + std::unique_lock locker(m_lock); + + m_global_image_status[global_image_id] = mirror_image_site_status; + if (immediate_update) { + m_update_global_image_ids.insert(global_image_id); + queue_update_task(std::move(locker)); + } +} + +template +void MirrorStatusUpdater::remove_mirror_image_status( + const std::string& global_image_id, Context* on_finish) { + if (try_remove_mirror_image_status(global_image_id, on_finish)) { + m_threads->work_queue->queue(on_finish, 0); + } +} + +template +bool MirrorStatusUpdater::try_remove_mirror_image_status( + const std::string& global_image_id, Context* on_finish) { + dout(15) << "global_image_id=" << global_image_id << dendl; + + std::unique_lock locker(m_lock); + if ((m_update_in_flight && + m_updating_global_image_ids.count(global_image_id) > 0) || + ((m_update_in_progress || m_update_requested) && + m_update_global_image_ids.count(global_image_id) > 0)) { + // if update is scheduled/in-progress, wait for it to complete + on_finish = new LambdaContext( + [this, global_image_id, on_finish](int r) { + if (try_remove_mirror_image_status(global_image_id, on_finish)) { + on_finish->complete(0); + } + }); + m_update_on_finish_ctxs.push_back(on_finish); + return false; + } + + m_global_image_status.erase(global_image_id); + return true; +} + +template +void MirrorStatusUpdater::schedule_timer_task() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(m_timer_task == nullptr); + m_timer_task = create_context_callback< + MirrorStatusUpdater, + &MirrorStatusUpdater::handle_timer_task>(this); + m_threads->timer->add_event_after(UPDATE_INTERVAL_SECONDS, m_timer_task); +} + +template +void MirrorStatusUpdater::handle_timer_task(int r) { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(m_timer_task != nullptr); + m_timer_task = nullptr; + schedule_timer_task(); + + std::unique_lock locker(m_lock); + for (auto& pair : m_global_image_status) { + m_update_global_image_ids.insert(pair.first); + } + + queue_update_task(std::move(locker)); +} + +template +void MirrorStatusUpdater::queue_update_task( + std::unique_lock&& locker) { + if (!m_initialized) { + return; + } + + if (m_update_in_progress) { + if (m_update_in_flight) { + dout(10) << "deferring update due to in-flight ops" << dendl; + m_update_requested = true; + } + return; + } + + m_update_in_progress = true; + ceph_assert(!m_update_in_flight); + ceph_assert(!m_update_requested); + locker.unlock(); + + dout(10) << dendl; + auto ctx = create_context_callback< + MirrorStatusUpdater, + &MirrorStatusUpdater::update_task>(this); + m_threads->work_queue->queue(ctx); +} + +template +void MirrorStatusUpdater::update_task(int r) { + dout(10) << dendl; + + std::unique_lock locker(m_lock); + ceph_assert(m_update_in_progress); + ceph_assert(!m_update_in_flight); + m_update_in_flight = true; + + std::swap(m_updating_global_image_ids, m_update_global_image_ids); + auto updating_global_image_ids = m_updating_global_image_ids; + auto global_image_status = m_global_image_status; + locker.unlock(); + + Context* ctx = create_context_callback< + MirrorStatusUpdater, + &MirrorStatusUpdater::handle_update_task>(this); + auto gather = new C_Gather(g_ceph_context, ctx); + + auto it = updating_global_image_ids.begin(); + while (it != updating_global_image_ids.end()) { + librados::ObjectWriteOperation op; + uint32_t op_count = 0; + + while (it != updating_global_image_ids.end() && + op_count < MAX_UPDATES_PER_OP) { + auto& global_image_id = *it; + ++it; + + auto status_it = global_image_status.find(global_image_id); + if (status_it == global_image_status.end()) { + continue; + } + + librbd::cls_client::mirror_image_status_set(&op, global_image_id, + status_it->second); + ++op_count; + } + + auto aio_comp = create_rados_callback(gather->new_sub()); + int r = m_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op); + ceph_assert(r == 0); + aio_comp->release(); + } + + gather->activate(); +} + +template +void MirrorStatusUpdater::handle_update_task(int r) { + dout(10) << dendl; + if (r < 0) { + derr << "failed to update mirror image statuses: " << cpp_strerror(r) + << dendl; + } + + std::unique_lock locker(m_lock); + + Contexts on_finish_ctxs; + std::swap(on_finish_ctxs, m_update_on_finish_ctxs); + + ceph_assert(m_update_in_progress); + m_update_in_progress = false; + + ceph_assert(m_update_in_flight); + m_update_in_flight = false; + + m_updating_global_image_ids.clear(); + + if (m_update_requested) { + m_update_requested = false; + queue_update_task(std::move(locker)); + } else { + locker.unlock(); + } + + for (auto on_finish : on_finish_ctxs) { + on_finish->complete(0); + } +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::MirrorStatusUpdater; diff --git a/src/tools/rbd_mirror/MirrorStatusUpdater.h b/src/tools/rbd_mirror/MirrorStatusUpdater.h new file mode 100644 index 000000000000..60f4be476339 --- /dev/null +++ b/src/tools/rbd_mirror/MirrorStatusUpdater.h @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_MIRROR_STATUS_UPDATER_H +#define CEPH_RBD_MIRROR_MIRROR_STATUS_UPDATER_H + +#include "include/rados/librados.hpp" +#include "common/ceph_mutex.h" +#include "cls/rbd/cls_rbd_types.h" +#include +#include +#include +#include + +struct Context; +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +template struct Threads; + +template +class MirrorStatusUpdater { +public: + + static MirrorStatusUpdater* create(librados::IoCtx& io_ctx, + Threads *threads) { + return new MirrorStatusUpdater(io_ctx, threads); + } + + MirrorStatusUpdater(librados::IoCtx& io_ctx, Threads *threads); + ~MirrorStatusUpdater(); + + void init(Context* on_finish); + void shut_down(Context* on_finish); + + bool exists(const std::string& global_image_id); + void set_mirror_image_status( + const std::string& global_image_id, + const cls::rbd::MirrorImageSiteStatus& mirror_image_site_status, + bool immediate_update); + void remove_mirror_image_status(const std::string& global_image_id, + Context* on_finish); + +private: + typedef std::list Contexts; + typedef std::set GlobalImageIds; + typedef std::map + GlobalImageStatus; + + librados::IoCtx m_io_ctx; + Threads* m_threads; + + Context* m_timer_task = nullptr; + + ceph::mutex m_lock; + + bool m_initialized = false; + + GlobalImageIds m_update_global_image_ids; + GlobalImageStatus m_global_image_status; + + bool m_update_in_progress = false; + bool m_update_in_flight = false; + bool m_update_requested = false; + Contexts m_update_on_finish_ctxs; + GlobalImageIds m_updating_global_image_ids; + + bool try_remove_mirror_image_status(const std::string& global_image_id, + Context* on_finish); + + void schedule_timer_task(); + void handle_timer_task(int r); + + void queue_update_task(std::unique_lock&& locker); + void update_task(int r); + void handle_update_task(int r); + +}; + +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::MirrorStatusUpdater; + +#endif // CEPH_RBD_MIRROR_MIRROR_STATUS_UPDATER_H