OPTION(rbd_mirror_concurrent_image_syncs, OPT_U32, 5) // maximum number of image syncs in parallel
OPTION(rbd_mirror_pool_replayers_refresh_interval, OPT_INT, 30) // interval to refresh peers in rbd-mirror daemon
OPTION(rbd_mirror_delete_retry_interval, OPT_DOUBLE, 30) // interval to check and retry the failed requests in deleter
-OPTION(rbd_mirror_image_directory_refresh_interval, OPT_INT, 30) // interval to refresh images in pool watcher
OPTION(rbd_mirror_image_state_check_interval, OPT_INT, 30) // interval to get images from pool watcher and set sources in replayer
OPTION(rbd_mirror_leader_heartbeat_interval, OPT_INT, 5) // interval (in seconds) between mirror leader heartbeats
OPTION(rbd_mirror_leader_max_missed_heartbeats, OPT_INT, 2) // number of missed heartbeats for non-lock owner to attempt to acquire lock
Context *on_notify_ack) {
CephContext *cct = this->m_cct;
ldout(cct, 20) << ": mode updated: " << payload.mirror_mode << dendl;
- handle_mode_updated(payload.mirror_mode, on_notify_ack);
+ handle_mode_updated(payload.mirror_mode);
return true;
}
CephContext *cct = this->m_cct;
ldout(cct, 20) << ": image state updated" << dendl;
handle_image_updated(payload.mirror_image_state, payload.image_id,
- payload.global_image_id, on_notify_ack);
+ payload.global_image_id);
return true;
}
const std::string &global_image_id,
Context *on_finish);
- virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode,
- Context *on_ack) = 0;
+ virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) = 0;
virtual void handle_image_updated(cls::rbd::MirrorImageState state,
const std::string &image_id,
- const std::string &global_image_id,
- Context *on_ack) = 0;
+ const std::string &global_image_id) = 0;
private:
bool handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload,
m_notifier.flush(on_finish);
}
+std::string Watcher::get_oid() const {
+ RWLock::RLocker locker(m_watch_lock);
+ return m_oid;
+}
+
void Watcher::set_oid(const string& oid) {
RWLock::WLocker l(m_watch_lock);
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
void unregister_watch(Context *on_finish);
void flush(Context *on_finish);
+ std::string get_oid() const;
void set_oid(const string& oid);
uint64_t get_watch_handle() const {
: MirroringWatcher<>(image_ctx.md_ctx, image_ctx.op_work_queue) {
}
- MOCK_METHOD2(handle_mode_updated, void(cls::rbd::MirrorMode, Context*));
- MOCK_METHOD4(handle_image_updated, void(cls::rbd::MirrorImageState,
+ MOCK_METHOD1(handle_mode_updated, void(cls::rbd::MirrorMode));
+ MOCK_METHOD3(handle_image_updated, void(cls::rbd::MirrorImageState,
const std::string &,
- const std::string &,
- Context*));
+ const std::string &));
};
} // anonymous namespace
};
TEST_F(TestMirroringWatcher, ModeUpdated) {
- EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _));
+ EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED));
C_SaferCond ctx;
MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED, &ctx);
TEST_F(TestMirroringWatcher, ImageStatusUpdated) {
EXPECT_CALL(*m_image_watcher,
handle_image_updated(cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
- StrEq("image id"), StrEq("global image id"),
- _));
+ StrEq("image id"),
+ StrEq("global image id")));
C_SaferCond ctx;
MockMirroringWatcher::notify_image_updated(m_ioctx,
test_LeaderWatcher.cc
test_fixture.cc
)
-add_library(rbd_mirror STATIC ${rbd_mirror_test_srcs})
-set_target_properties(rbd_mirror PROPERTIES COMPILE_FLAGS
+add_library(rbd_mirror_test STATIC ${rbd_mirror_test_srcs})
+set_target_properties(rbd_mirror_test PROPERTIES COMPILE_FLAGS
${UNITTEST_CXX_FLAGS})
add_executable(unittest_rbd_mirror
test_mock_ImageSyncThrottler.cc
test_mock_InstanceWatcher.cc
test_mock_LeaderWatcher.cc
+ test_mock_PoolWatcher.cc
image_replayer/test_mock_BootstrapRequest.cc
image_replayer/test_mock_CreateImageRequest.cc
image_replayer/test_mock_EventPreprocessor.cc
cls_lock
cls_rbd)
target_link_libraries(unittest_rbd_mirror
- rbd_mirror
+ rbd_mirror_test
rados_test_stub
rbd_mirror_internal
rbd_mirror_types
set_target_properties(ceph_test_rbd_mirror PROPERTIES COMPILE_FLAGS
${UNITTEST_CXX_FLAGS})
target_link_libraries(ceph_test_rbd_mirror
- rbd_mirror
+ rbd_mirror_test
rbd_mirror_internal
rbd_mirror_types
rbd_api
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MOCK_CONTEXT_WQ_H
+#define CEPH_MOCK_CONTEXT_WQ_H
+
+#include <gmock/gmock.h>
+
+struct Context;
+
+struct MockContextWQ {
+ void queue(Context *ctx) {
+ queue(ctx, 0);
+ }
+ MOCK_METHOD2(queue, void(Context *, int));
+};
+
+#endif // CEPH_MOCK_CONTEXT_WQ_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MOCK_SAFE_TIMER_H
+#define CEPH_MOCK_SAFE_TIMER_H
+
+#include <gmock/gmock.h>
+
+struct Context;
+
+struct MockSafeTimer {
+ MOCK_METHOD2(add_event_after, void(double, Context*));
+ MOCK_METHOD1(cancel_event, void(Context *));
+};
+
+#endif // CEPH_MOCK_SAFE_TIMER_H
req->send();
ASSERT_EQ(0, ctx.wait());
- ImageIds expected_image_ids = {{"global id", "local id",
- boost::optional<std::string>{"image name"}}};
+ ImageIds expected_image_ids = {{"global id", "local id", "image name"}};
ASSERT_EQ(expected_image_ids, image_ids);
}
req->send();
ASSERT_EQ(0, ctx.wait());
- expected_image_ids.insert({"global id", "local id",
- boost::optional<std::string>{"image name"}});
+ expected_image_ids.insert({"global id", "local id", "image name"});
ASSERT_EQ(expected_image_ids, image_ids);
}
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+
#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
#include "include/stringify.h"
#include "common/errno.h"
#include "common/Mutex.h"
#include "tools/rbd_mirror/PoolWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/types.h"
#include "test/librados/test.h"
#include "gtest/gtest.h"
class TestPoolWatcher : public ::rbd::mirror::TestFixture {
public:
-TestPoolWatcher() : m_lock("TestPoolWatcherLock"),
- m_image_number(0), m_snap_number(0)
+ TestPoolWatcher()
+ : m_lock("TestPoolWatcherLock"), m_pool_watcher_listener(this),
+ m_image_number(0), m_snap_number(0)
{
m_cluster = std::make_shared<librados::Rados>();
EXPECT_EQ("", connect_cluster_pp(*m_cluster));
}
- ~TestPoolWatcher() override {
+ void TearDown() override {
+ if (m_pool_watcher) {
+ C_SaferCond ctx;
+ m_pool_watcher->shut_down(&ctx);
+ EXPECT_EQ(0, ctx.wait());
+ }
+
m_cluster->wait_for_latest_osdmap();
for (auto& pool : m_pools) {
EXPECT_EQ(0, m_cluster->pool_delete(pool.c_str()));
}
+
+ TestFixture::TearDown();
}
+ struct PoolWatcherListener : public PoolWatcher<>::Listener {
+ TestPoolWatcher *test;
+ Cond cond;
+ ImageIds image_ids;
+
+ PoolWatcherListener(TestPoolWatcher *test) : test(test) {
+ }
+
+ void handle_update(const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids) override {
+ Mutex::Locker locker(test->m_lock);
+ for (auto &image_id : removed_image_ids) {
+ image_ids.erase(image_id);
+ }
+ image_ids.insert(added_image_ids.begin(), added_image_ids.end());
+ cond.Signal();
+ }
+ };
+
void create_pool(bool enable_mirroring, const peer_t &peer, string *name=nullptr) {
string pool_name = get_temp_pool_name("test-rbd-mirror-");
ASSERT_EQ(0, m_cluster->pool_create(pool_name.c_str()));
librados::IoCtx ioctx;
ASSERT_EQ(0, m_cluster->ioctx_create2(pool_id, ioctx));
- m_pool_watcher.reset(new PoolWatcher(ioctx, 30, m_lock, m_cond));
+ m_pool_watcher.reset(new PoolWatcher<>(m_threads, ioctx,
+ m_pool_watcher_listener));
+
if (enable_mirroring) {
ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(ioctx,
RBD_MIRROR_MODE_POOL));
if (name != nullptr) {
*name = pool_name;
}
+
+ m_pool_watcher->init();
}
string get_image_id(librados::IoCtx *ioctx, const string &image_name) {
}
void check_images() {
- m_pool_watcher->refresh_images(false);
Mutex::Locker l(m_lock);
- ASSERT_EQ(m_mirrored_images, m_pool_watcher->get_images());
+ while (m_mirrored_images != m_pool_watcher_listener.image_ids) {
+ if (m_pool_watcher_listener.cond.WaitInterval(
+ m_lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+
+ ASSERT_EQ(m_mirrored_images, m_pool_watcher_listener.image_ids);
}
Mutex m_lock;
- Cond m_cond;
RadosRef m_cluster;
- unique_ptr<PoolWatcher> m_pool_watcher;
+ PoolWatcherListener m_pool_watcher_listener;
+ unique_ptr<PoolWatcher<> > m_pool_watcher;
set<string> m_pools;
ImageIds m_mirrored_images;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/mock/MockContextWQ.h"
+#include "test/rbd_mirror/mock/MockSafeTimer.h"
+#include "librbd/MirroringWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/PoolWatcher.h"
+#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
+#include "include/stringify.h"
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public librbd::MockImageCtx {
+ MockTestImageCtx(librbd::ImageCtx &image_ctx)
+ : librbd::MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+
+struct MockMirroringWatcher {
+ static MockMirroringWatcher *s_instance;
+ static MockMirroringWatcher &get_instance() {
+ assert(s_instance != nullptr);
+ return *s_instance;
+ }
+
+ MockMirroringWatcher() {
+ s_instance = this;
+ }
+
+ MOCK_CONST_METHOD0(is_unregistered, bool());
+ MOCK_METHOD1(register_watch, void(Context*));
+ MOCK_METHOD1(unregister_watch, void(Context*));
+
+ MOCK_CONST_METHOD0(get_oid, std::string());
+};
+
+template <>
+struct MirroringWatcher<MockTestImageCtx> {
+ static MirroringWatcher *s_instance;
+
+ MirroringWatcher(librados::IoCtx &io_ctx, ::MockContextWQ *work_queue) {
+ s_instance = this;
+ }
+ virtual ~MirroringWatcher() {
+ }
+
+ static MirroringWatcher<MockTestImageCtx> &get_instance() {
+ assert(s_instance != nullptr);
+ return *s_instance;
+ }
+
+ virtual void handle_rewatch_complete(int r) = 0;
+
+ virtual void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) = 0;
+ virtual void handle_image_updated(cls::rbd::MirrorImageState state,
+ const std::string &remote_image_id,
+ const std::string &global_image_id) = 0;
+
+ bool is_unregistered() const {
+ return MockMirroringWatcher::get_instance().is_unregistered();
+ }
+ void register_watch(Context *ctx) {
+ MockMirroringWatcher::get_instance().register_watch(ctx);
+ }
+ void unregister_watch(Context *ctx) {
+ MockMirroringWatcher::get_instance().unregister_watch(ctx);
+ }
+ std::string get_oid() const {
+ return MockMirroringWatcher::get_instance().get_oid();
+ }
+};
+
+MockMirroringWatcher *MockMirroringWatcher::s_instance = nullptr;
+MirroringWatcher<MockTestImageCtx> *MirroringWatcher<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+ MockSafeTimer *timer;
+ Mutex &timer_lock;
+
+ MockContextWQ *work_queue;
+
+ Threads(Threads<librbd::ImageCtx> *threads)
+ : timer(new MockSafeTimer()),
+ timer_lock(threads->timer_lock),
+ work_queue(new MockContextWQ()) {
+ }
+ ~Threads() {
+ delete timer;
+ delete work_queue;
+ }
+};
+
+namespace pool_watcher {
+
+template <>
+struct RefreshImagesRequest<librbd::MockTestImageCtx> {
+ ImageIds *image_ids = nullptr;
+ Context *on_finish = nullptr;
+ static RefreshImagesRequest *s_instance;
+ static RefreshImagesRequest *create(librados::IoCtx &io_ctx,
+ ImageIds *image_ids,
+ Context *on_finish) {
+ assert(s_instance != nullptr);
+ s_instance->image_ids = image_ids;
+ s_instance->on_finish = on_finish;
+ return s_instance;
+ }
+
+ MOCK_METHOD0(send, void());
+
+ RefreshImagesRequest() {
+ s_instance = this;
+ }
+};
+
+RefreshImagesRequest<librbd::MockTestImageCtx> *RefreshImagesRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace pool_watcher
+
+} // namespace mirror
+} // namespace rbd
+
+// template definitions
+#include "tools/rbd_mirror/PoolWatcher.cc"
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::StrEq;
+using ::testing::WithArg;
+using ::testing::WithoutArgs;
+
+class TestMockPoolWatcher : public TestMockFixture {
+public:
+ typedef PoolWatcher<librbd::MockTestImageCtx> MockPoolWatcher;
+ typedef Threads<librbd::MockTestImageCtx> MockThreads;
+ typedef pool_watcher::RefreshImagesRequest<librbd::MockTestImageCtx> MockRefreshImagesRequest;
+ typedef librbd::MockMirroringWatcher MockMirroringWatcher;
+ typedef librbd::MirroringWatcher<librbd::MockTestImageCtx> MirroringWatcher;
+
+ struct MockListener : MockPoolWatcher::Listener {
+ TestMockPoolWatcher *test;
+
+ MockListener(TestMockPoolWatcher *test) : test(test) {
+ }
+
+ MOCK_METHOD2(handle_update, void(const ImageIds &, const ImageIds &));
+ };
+
+ TestMockPoolWatcher() : m_lock("TestMockPoolWatcher::m_lock") {
+ }
+
+ void expect_work_queue(MockThreads &mock_threads) {
+ EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+ .WillRepeatedly(Invoke([this](Context *ctx, int r) {
+ m_threads->work_queue->queue(ctx, r);
+ }));
+ }
+
+ void expect_mirroring_watcher_is_unregistered(MockMirroringWatcher &mock_mirroring_watcher,
+ bool unregistered) {
+ EXPECT_CALL(mock_mirroring_watcher, is_unregistered())
+ .WillOnce(Return(unregistered));
+ }
+
+ void expect_mirroring_watcher_register(MockMirroringWatcher &mock_mirroring_watcher,
+ int r) {
+ EXPECT_CALL(mock_mirroring_watcher, register_watch(_))
+ .WillOnce(CompleteContext(r));
+ }
+
+ void expect_mirroring_watcher_unregister(MockMirroringWatcher &mock_mirroring_watcher,
+ int r) {
+ EXPECT_CALL(mock_mirroring_watcher, unregister_watch(_))
+ .WillOnce(CompleteContext(r));
+ }
+
+ void expect_refresh_images(MockRefreshImagesRequest &request,
+ const ImageIds &image_ids, int r) {
+ EXPECT_CALL(request, send())
+ .WillOnce(Invoke([&request, image_ids, r]() {
+ *request.image_ids = image_ids;
+ request.on_finish->complete(r);
+ }));
+ }
+
+ void expect_listener_handle_update(MockListener &mock_listener,
+ const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids) {
+ EXPECT_CALL(mock_listener, handle_update(added_image_ids, removed_image_ids))
+ .WillOnce(WithoutArgs(Invoke([this]() {
+ Mutex::Locker locker(m_lock);
+ ++m_update_count;
+ m_cond.Signal();
+ })));
+ }
+
+ void expect_dir_list(librados::IoCtx &io_ctx,
+ const std::string &id, const std::string &name, int r) {
+ bufferlist in_bl;
+ ::encode(id, in_bl);
+
+ bufferlist out_bl;
+ ::encode(name, out_bl);
+
+ EXPECT_CALL(get_mock_io_ctx(io_ctx),
+ exec(RBD_DIRECTORY, _, StrEq("rbd"), StrEq("dir_get_name"),
+ ContentsEqual(in_bl), _, _))
+ .WillOnce(DoAll(WithArg<5>(Invoke([this, out_bl](bufferlist *bl) {
+ *bl = out_bl;
+ Mutex::Locker locker(m_lock);
+ ++m_get_name_count;
+ m_cond.Signal();
+ })),
+ Return(r)));
+ }
+
+ void expect_timer_add_event(MockThreads &mock_threads) {
+ EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
+ .WillOnce(WithArg<1>(Invoke([](Context *ctx) {
+ ctx->complete(0);
+ })));
+ }
+
+ int when_shut_down(MockPoolWatcher &mock_pool_watcher) {
+ C_SaferCond ctx;
+ mock_pool_watcher.shut_down(&ctx);
+ return ctx.wait();
+ }
+
+ bool wait_for_update(uint32_t count) {
+ Mutex::Locker locker(m_lock);
+ while (m_update_count < count) {
+ if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ if (m_update_count < count) {
+ return false;
+ }
+
+ m_update_count -= count;
+ return true;
+ }
+
+ bool wait_for_get_name(uint32_t count) {
+ Mutex::Locker locker(m_lock);
+ while (m_get_name_count < count) {
+ if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ if (m_get_name_count < count) {
+ return false;
+ }
+
+ m_get_name_count -= count;
+ return true;
+ }
+
+ Mutex m_lock;
+ Cond m_cond;
+ uint32_t m_update_count = 0;
+ uint32_t m_get_name_count = 0;
+};
+
+TEST_F(TestMockPoolWatcher, EmptyPool) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+
+ ASSERT_TRUE(wait_for_update(1));
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, NonEmptyPool) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ ImageIds image_ids{
+ {"global id 1", "remote id 1", "image name 1"},
+ {"global id 2", "remote id 2", "image name 2"}};
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, image_ids, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, image_ids, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+
+ ASSERT_TRUE(wait_for_update(1));
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, NotifyDuringRefresh) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ ImageIds image_ids{
+ {"global id 1", "remote id 1", "image name 1"},
+ {"global id 2", "remote id 2", "image name 2"}};
+ MockRefreshImagesRequest mock_refresh_images_request;
+ bool refresh_sent = false;
+ EXPECT_CALL(mock_refresh_images_request, send())
+ .WillOnce(Invoke([this, &mock_refresh_images_request, &image_ids,
+ &refresh_sent]() {
+ *mock_refresh_images_request.image_ids = image_ids;
+
+ Mutex::Locker locker(m_lock);
+ refresh_sent = true;
+ m_cond.Signal();
+ }));
+
+ expect_dir_list(m_remote_io_ctx, "remote id 1a", "image name 1a", 0);
+ expect_dir_list(m_remote_io_ctx, "remote id 3", "image name 3", 0);
+ expect_dir_list(m_remote_io_ctx, "dummy", "", -ENOENT);
+
+ MockListener mock_listener(this);
+ image_ids = {
+ {"global id 1", "remote id 1a", "image name 1a"},
+ {"global id 3", "remote id 3", "image name 3"}};
+ expect_listener_handle_update(mock_listener, image_ids, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ mock_pool_watcher.init(nullptr);
+
+ {
+ Mutex::Locker locker(m_lock);
+ while (!refresh_sent) {
+ m_cond.Wait(m_lock);
+ }
+ }
+
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_DISABLING, "remote id 2", "global id 2");
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 1a", "global id 1");
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 3", "global id 3");
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "dummy", "dummy");
+ wait_for_get_name(3);
+
+ mock_refresh_images_request.on_finish->complete(0);
+ ASSERT_TRUE(wait_for_update(1));
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, Notify) {
+ MockThreads mock_threads(m_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ ImageIds image_ids{
+ {"global id 1", "remote id 1", "image name 1"},
+ {"global id 2", "remote id 2", "image name 2"}};
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, image_ids, 0);
+ EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+ .WillOnce(Invoke([this](Context *ctx, int r) {
+ m_threads->work_queue->queue(ctx, r);
+ }));
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, image_ids, {});
+
+ Context *notify_ctx = nullptr;
+ EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+ .WillOnce(Invoke([this, ¬ify_ctx](Context *ctx, int r) {
+ Mutex::Locker locker(m_lock);
+ ASSERT_EQ(nullptr, notify_ctx);
+ notify_ctx = ctx;
+ m_cond.Signal();
+ }));
+ expect_dir_list(m_remote_io_ctx, "remote id 1a", "image name 1a", 0);
+ expect_dir_list(m_remote_io_ctx, "remote id 3", "image name 3", 0);
+ expect_dir_list(m_remote_io_ctx, "dummy", "", -ENOENT);
+ expect_listener_handle_update(
+ mock_listener,
+ {{"global id 1", "remote id 1a", "image name 1a"},
+ {"global id 3", "remote id 3", "image name 3"}},
+ {{"global id 1", "remote id 1", "image name 1"},
+ {"global id 2", "remote id 2", "image name 2"}});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_TRUE(wait_for_update(1));
+
+ C_SaferCond flush_ctx;
+ m_threads->work_queue->queue(&flush_ctx, 0);
+ ASSERT_EQ(0, flush_ctx.wait());
+
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_DISABLING, "remote id 2", "global id 2");
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_DISABLED, "remote id 2", "global id 2");
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 1a", "global id 1");
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id 3", "global id 3");
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "dummy", "dummy");
+ ASSERT_TRUE(wait_for_get_name(3));
+ notify_ctx->complete(0);
+
+ ASSERT_TRUE(wait_for_update(1));
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RegisterWatcherBlacklist) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, -EBLACKLISTED);
+
+ MockListener mock_listener(this);
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(-EBLACKLISTED, ctx.wait());
+ ASSERT_TRUE(mock_pool_watcher.is_blacklisted());
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RegisterWatcherMissing) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, -ENOENT);
+ expect_timer_add_event(mock_threads);
+
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+
+ ASSERT_TRUE(wait_for_update(1));
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RegisterWatcherError) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, -EINVAL);
+ expect_timer_add_event(mock_threads);
+
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+
+ ASSERT_TRUE(wait_for_update(1));
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RefreshBlacklist) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, -EBLACKLISTED);
+
+ MockListener mock_listener(this);
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(-EBLACKLISTED, ctx.wait());
+ ASSERT_TRUE(mock_pool_watcher.is_blacklisted());
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RefreshMissing) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, -ENOENT);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+
+ ASSERT_TRUE(wait_for_update(1));
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RefreshError) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, -EINVAL);
+ expect_timer_add_event(mock_threads);
+
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+
+ ASSERT_TRUE(wait_for_update(1));
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, Rewatch) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ expect_timer_add_event(mock_threads);
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+ expect_refresh_images(mock_refresh_images_request, {{"global id", "image id", "name"}}, 0);
+ expect_listener_handle_update(mock_listener, {{"global id", "image id", "name"}}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_TRUE(wait_for_update(1));
+
+ MirroringWatcher::get_instance().handle_rewatch_complete(0);
+ ASSERT_TRUE(wait_for_update(1));
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RewatchBlacklist) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_TRUE(wait_for_update(1));
+
+ MirroringWatcher::get_instance().handle_rewatch_complete(-EBLACKLISTED);
+ ASSERT_TRUE(mock_pool_watcher.is_blacklisted());
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, RewatchError) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ expect_timer_add_event(mock_threads);
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+ expect_refresh_images(mock_refresh_images_request, {{"global id", "image id", "name"}}, 0);
+ expect_listener_handle_update(mock_listener, {{"global id", "image id", "name"}}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_TRUE(wait_for_update(1));
+
+ MirroringWatcher::get_instance().handle_rewatch_complete(-EINVAL);
+ ASSERT_TRUE(wait_for_update(1));
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, GetImageNameBlacklist) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ expect_dir_list(m_remote_io_ctx, "remote id", "image name", -EBLACKLISTED);
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_TRUE(wait_for_update(1));
+
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id", "global id");
+ ASSERT_TRUE(wait_for_get_name(1));
+ while (true) {
+ if (mock_pool_watcher.is_blacklisted()) {
+ break;
+ }
+ usleep(1000);
+ }
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+TEST_F(TestMockPoolWatcher, GetImageNameError) {
+ MockThreads mock_threads(m_threads);
+ expect_work_queue(mock_threads);
+
+ InSequence seq;
+ MockMirroringWatcher mock_mirroring_watcher;
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, true);
+ expect_mirroring_watcher_register(mock_mirroring_watcher, 0);
+
+ MockRefreshImagesRequest mock_refresh_images_request;
+ expect_refresh_images(mock_refresh_images_request, {}, 0);
+
+ MockListener mock_listener(this);
+ expect_listener_handle_update(mock_listener, {}, {});
+
+ expect_dir_list(m_remote_io_ctx, "remote id", "image name", -EINVAL);
+ expect_timer_add_event(mock_threads);
+
+ expect_mirroring_watcher_is_unregistered(mock_mirroring_watcher, false);
+ expect_refresh_images(mock_refresh_images_request, {{"global id", "remote id", "name"}}, 0);
+ expect_listener_handle_update(mock_listener, {{"global id", "remote id", "name"}}, {});
+
+ MockPoolWatcher mock_pool_watcher(&mock_threads, m_remote_io_ctx,
+ mock_listener);
+ C_SaferCond ctx;
+ mock_pool_watcher.init(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_TRUE(wait_for_update(1));
+
+ MirroringWatcher::get_instance().handle_image_updated(
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED, "remote id", "global id");
+ ASSERT_TRUE(wait_for_get_name(1));
+ ASSERT_TRUE(wait_for_update(1));
+
+ expect_mirroring_watcher_unregister(mock_mirroring_watcher, 0);
+ ASSERT_EQ(0, when_shut_down(mock_pool_watcher));
+}
+
+} // namespace mirror
+} // namespace rbd
<< ": " << cpp_strerror(r) << dendl;
pool_name = stringify(m_local_pool_id);
}
- m_name = pool_name + "/" + m_global_image_id;
+ m_name = pool_name + "/" + m_global_image_id;
+ dout(20) << "registered asok hook: " << m_name << dendl;
m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
this);
}
}
}
if (!m_asok_hook) {
+ dout(20) << "registered asok hook: " << m_name << dendl;
m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
this);
}
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
-#include <boost/bind.hpp>
-
+#include "tools/rbd_mirror/PoolWatcher.h"
+#include "include/rbd_types.h"
+#include "cls/rbd/cls_rbd_client.h"
#include "common/debug.h"
#include "common/errno.h"
-
-#include "cls/rbd/cls_rbd_client.h"
-#include "include/rbd_types.h"
+#include "common/Timer.h"
+#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
+#include "librbd/MirroringWatcher.h"
+#include "librbd/Utils.h"
#include "librbd/api/Image.h"
#include "librbd/api/Mirror.h"
-#include "PoolWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
+#include <boost/bind.hpp>
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
using std::string;
using std::unique_ptr;
using std::vector;
-
-using librados::Rados;
-using librados::IoCtx;
-using librbd::cls_client::mirror_image_list;
+using librbd::util::create_context_callback;
+using librbd::util::create_rados_callback;
namespace rbd {
namespace mirror {
-PoolWatcher::PoolWatcher(librados::IoCtx &remote_io_ctx,
- double interval_seconds,
- Mutex &lock, Cond &cond) :
- m_lock(lock),
- m_refresh_cond(cond),
- m_timer(g_ceph_context, m_lock, false),
- m_interval(interval_seconds)
-{
- m_remote_io_ctx.dup(remote_io_ctx);
- m_timer.init();
+template <typename I>
+class PoolWatcher<I>::MirroringWatcher : public librbd::MirroringWatcher<I> {
+public:
+ using ContextWQ = typename std::decay<
+ typename std::remove_pointer<
+ decltype(Threads<I>::work_queue)>::type>::type;
+
+ MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ PoolWatcher *pool_watcher)
+ : librbd::MirroringWatcher<I>(io_ctx, work_queue),
+ m_pool_watcher(pool_watcher) {
+ }
+
+ void handle_rewatch_complete(int r) override {
+ m_pool_watcher->handle_rewatch_complete(r);
+ }
+
+ void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) override {
+ // do nothing
+ }
+
+ void handle_image_updated(cls::rbd::MirrorImageState state,
+ const std::string &remote_image_id,
+ const std::string &global_image_id) override {
+ bool enabled = (state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+ m_pool_watcher->handle_image_updated(remote_image_id, global_image_id,
+ enabled);
+ }
+
+private:
+ PoolWatcher *m_pool_watcher;
+};
+
+template <typename I>
+PoolWatcher<I>::PoolWatcher(Threads<I> *threads, librados::IoCtx &remote_io_ctx,
+ Listener &listener)
+ : m_threads(threads), m_remote_io_ctx(remote_io_ctx), m_listener(listener),
+ m_lock(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this)) {
+ m_mirroring_watcher = new MirroringWatcher(m_remote_io_ctx,
+ m_threads->work_queue, this);
}
-PoolWatcher::~PoolWatcher()
-{
- Mutex::Locker l(m_lock);
- m_stopping = true;
- m_timer.shutdown();
+template <typename I>
+PoolWatcher<I>::~PoolWatcher() {
+ delete m_mirroring_watcher;
}
-bool PoolWatcher::is_blacklisted() const {
- assert(m_lock.is_locked());
+template <typename I>
+bool PoolWatcher<I>::is_blacklisted() const {
+ Mutex::Locker locker(m_lock);
return m_blacklisted;
}
-const ImageIds& PoolWatcher::get_images() const
-{
- assert(m_lock.is_locked());
- return m_images;
+template <typename I>
+void PoolWatcher<I>::init(Context *on_finish) {
+ dout(5) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_on_init_finish = on_finish;
+ }
+
+ // start async updates for mirror image directory
+ register_watcher();
+}
+
+template <typename I>
+void PoolWatcher<I>::shut_down(Context *on_finish) {
+ dout(5) << dendl;
+
+ {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+
+ assert(!m_shutting_down);
+ m_shutting_down = true;
+ if (m_timer_ctx != nullptr) {
+ m_threads->timer->cancel_event(m_timer_ctx);
+ m_timer_ctx = nullptr;
+ }
+ }
+
+ // in-progress unregister tracked as async op
+ unregister_watcher();
+
+ m_async_op_tracker.wait_for_ops(on_finish);
}
-void PoolWatcher::refresh_images(bool reschedule)
-{
- ImageIds image_ids;
- int r = refresh(&image_ids);
+template <typename I>
+void PoolWatcher<I>::register_watcher() {
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_image_ids_invalid);
+ assert(!m_refresh_in_progress);
+ m_refresh_in_progress = true;
+ }
+
+ // if the watch registration is in-flight, let the watcher
+ // handle the transition -- only (re-)register if it's not registered
+ if (!m_mirroring_watcher->is_unregistered()) {
+ refresh_images();
+ return;
+ }
+
+ // first time registering or the watch failed
+ dout(5) << dendl;
+ m_async_op_tracker.start_op();
+
+ Context *ctx = create_context_callback<
+ PoolWatcher, &PoolWatcher<I>::handle_register_watcher>(this);
+ m_mirroring_watcher->register_watch(ctx);
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_register_watcher(int r) {
+ dout(5) << "r=" << r << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_image_ids_invalid);
+ assert(m_refresh_in_progress);
+ if (r < 0) {
+ m_refresh_in_progress = false;
+ }
+ }
- Mutex::Locker l(m_lock);
+ Context *on_init_finish = nullptr;
if (r >= 0) {
- m_images = std::move(image_ids);
+ refresh_images();
} else if (r == -EBLACKLISTED) {
- derr << "blacklisted during image refresh" << dendl;
+ dout(0) << "detected client is blacklisted" << dendl;
+
+ Mutex::Locker locker(m_lock);
m_blacklisted = true;
+ std::swap(on_init_finish, m_on_init_finish);
+ } else if (r == -ENOENT) {
+ dout(5) << "mirroring directory does not exist" << dendl;
+ schedule_refresh_images(30);
+ } else {
+ derr << "unexpected error registering mirroring directory watch: "
+ << cpp_strerror(r) << dendl;
+ schedule_refresh_images(10);
+ }
+
+ m_async_op_tracker.finish_op();
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(r);
}
+}
- if (!m_stopping && reschedule) {
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&PoolWatcher::refresh_images, this, true));
- m_timer.add_event_after(m_interval, ctx);
+template <typename I>
+void PoolWatcher<I>::unregister_watcher() {
+ dout(5) << dendl;
+
+ m_async_op_tracker.start_op();
+ Context *ctx = new FunctionContext([this](int r) {
+ dout(5) << "unregister_watcher: r=" << r << dendl;
+ if (r < 0) {
+ derr << "error unregistering watcher for "
+ << m_mirroring_watcher->get_oid() << " object: " << cpp_strerror(r)
+ << dendl;
+ }
+ m_async_op_tracker.finish_op();
+ });
+
+ m_mirroring_watcher->unregister_watch(ctx);
+}
+
+template <typename I>
+void PoolWatcher<I>::refresh_images() {
+ dout(5) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_image_ids_invalid);
+ assert(m_refresh_in_progress);
+
+ // clear all pending notification events since we need to perform
+ // a full image list refresh
+ m_pending_added_image_ids.clear();
+ m_pending_removed_image_ids.clear();
+ if (!m_updated_images.empty()) {
+ auto it = m_updated_images.begin();
+ it->invalid = true;
+
+ // only have a single in-flight request -- remove the rest
+ ++it;
+ while (it != m_updated_images.end()) {
+ m_id_to_updated_images.erase({it->global_image_id,
+ it->remote_image_id});
+ it = m_updated_images.erase(it);
+ }
+ }
}
- m_refresh_cond.Signal();
- // TODO: perhaps use a workqueue instead, once we get notifications
- // about new/removed mirrored images
+
+ m_async_op_tracker.start_op();
+ m_refresh_image_ids.clear();
+ Context *ctx = create_context_callback<
+ PoolWatcher, &PoolWatcher<I>::handle_refresh_images>(this);
+ auto req = pool_watcher::RefreshImagesRequest<I>::create(m_remote_io_ctx,
+ &m_refresh_image_ids,
+ ctx);
+ req->send();
}
-int PoolWatcher::refresh(ImageIds *image_ids) {
- dout(20) << "enter" << dendl;
+template <typename I>
+void PoolWatcher<I>::handle_refresh_images(int r) {
+ dout(5) << "r=" << r << dendl;
+
+ bool retry_refresh = false;
+ Context *on_init_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_image_ids_invalid);
+ assert(m_refresh_in_progress);
+ m_refresh_in_progress = false;
+
+ if (r >= 0) {
+ m_image_ids_invalid = false;
+ m_pending_image_ids = m_refresh_image_ids;
+ std::swap(on_init_finish, m_on_init_finish);
+ schedule_listener();
+ } else if (r == -EBLACKLISTED) {
+ dout(0) << "detected client is blacklisted during image refresh" << dendl;
+
+ m_blacklisted = true;
+ std::swap(on_init_finish, m_on_init_finish);
+ } else if (r == -ENOENT) {
+ dout(5) << "mirroring directory not found" << dendl;
+ m_image_ids_invalid = false;
+ m_pending_image_ids.clear();
+ std::swap(on_init_finish, m_on_init_finish);
+ r = 0;
+ schedule_listener();
+ } else {
+ retry_refresh = true;
+ }
+ }
+
+ if (retry_refresh) {
+ derr << "failed to retrieve mirroring directory: " << cpp_strerror(r)
+ << dendl;
+ schedule_refresh_images(10);
+ }
- std::string pool_name = m_remote_io_ctx.get_pool_name();
- rbd_mirror_mode_t mirror_mode;
- int r = librbd::api::Mirror<>::mode_get(m_remote_io_ctx, &mirror_mode);
- if (r < 0) {
- derr << "could not tell whether mirroring was enabled for "
- << pool_name << ": " << cpp_strerror(r) << dendl;
- return r;
+ m_async_op_tracker.finish_op();
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(r);
}
- if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
- dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl;
- return 0;
+}
+
+template <typename I>
+void PoolWatcher<I>::schedule_refresh_images(double interval) {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) {
+ return;
}
- std::map<std::string, std::string> images_map;
- r = librbd::api::Image<>::list_images(m_remote_io_ctx, &images_map);
- if (r < 0) {
- derr << "error retrieving image names from pool " << pool_name << ": "
+ m_image_ids_invalid = true;
+ m_timer_ctx = new FunctionContext([this](int r) {
+ processs_refresh_images();
+ });
+ m_threads->timer->add_event_after(interval, m_timer_ctx);
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_rewatch_complete(int r) {
+ dout(5) << "r=" << r << dendl;
+
+ if (r == -EBLACKLISTED) {
+ dout(0) << "detected client is blacklisted" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_blacklisted = true;
+ return;
+ } else if (r == -ENOENT) {
+ dout(5) << "mirroring directory deleted" << dendl;
+ } else if (r < 0) {
+ derr << "unexpected error re-registering mirroring directory watch: "
<< cpp_strerror(r) << dendl;
- return r;
}
- std::map<std::string, std::string> image_id_to_name;
- for (const auto& img_pair : images_map) {
- image_id_to_name.insert(std::make_pair(img_pair.second, img_pair.first));
+ schedule_refresh_images(5);
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_image_updated(const std::string &remote_image_id,
+ const std::string &global_image_id,
+ bool enabled) {
+ dout(10) << "remote_image_id=" << remote_image_id << ", "
+ << "global_image_id=" << global_image_id << ", "
+ << "enabled=" << enabled << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ImageId image_id(global_image_id, remote_image_id);
+ m_pending_added_image_ids.erase(image_id);
+ m_pending_removed_image_ids.erase(image_id);
+
+ auto id = std::make_pair(global_image_id, remote_image_id);
+ auto id_it = m_id_to_updated_images.find(id);
+ if (id_it != m_id_to_updated_images.end()) {
+ id_it->second->enabled = enabled;
+ id_it->second->invalid = false;
+ } else if (enabled) {
+ // need to resolve the image name before notifying listener
+ auto it = m_updated_images.emplace(m_updated_images.end(),
+ global_image_id, remote_image_id);
+ m_id_to_updated_images[id] = it;
+ schedule_get_image_name();
+ } else {
+ // delete image w/ if no resolve name in-flight
+ m_pending_removed_image_ids.insert(image_id);
+ schedule_listener();
}
+}
- std::string last_read = "";
- int max_read = 1024;
- do {
- std::map<std::string, std::string> mirror_images;
- r = mirror_image_list(&m_remote_io_ctx, last_read, max_read,
- &mirror_images);
- if (r < 0) {
- derr << "error listing mirrored image directory: "
- << cpp_strerror(r) << dendl;
- return r;
+template <typename I>
+void PoolWatcher<I>::schedule_get_image_name() {
+ assert(m_lock.is_locked());
+ if (m_shutting_down || m_blacklisted || m_updated_images.empty() ||
+ m_get_name_in_progress) {
+ return;
+ }
+ m_get_name_in_progress = true;
+
+ auto &updated_image = m_updated_images.front();
+ dout(10) << "global_image_id=" << updated_image.global_image_id << ", "
+ << "remote_image_id=" << updated_image.remote_image_id << dendl;
+
+ librados::ObjectReadOperation op;
+ librbd::cls_client::dir_get_name_start(&op, updated_image.remote_image_id);
+
+ m_async_op_tracker.start_op();
+
+ m_out_bl.clear();
+ librados::AioCompletion *aio_comp = create_rados_callback<
+ PoolWatcher, &PoolWatcher<I>::handle_get_image_name>(this);
+ int r = m_remote_io_ctx.aio_operate(RBD_DIRECTORY, aio_comp, &op, &m_out_bl);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void PoolWatcher<I>::handle_get_image_name(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::string name;
+ if (r == 0) {
+ bufferlist::iterator it = m_out_bl.begin();
+ r = librbd::cls_client::dir_get_name_finish(&it, &name);
+ }
+
+ bool image_ids_invalid = false;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(!m_updated_images.empty());
+ m_get_name_in_progress = false;
+
+ auto updated_image = m_updated_images.front();
+ m_updated_images.pop_front();
+ m_id_to_updated_images.erase(std::make_pair(updated_image.global_image_id,
+ updated_image.remote_image_id));
+
+ if (r == 0) {
+ // since names are resolved in event order -- the current update is
+ // the latest state
+ ImageId image_id(updated_image.global_image_id,
+ updated_image.remote_image_id, name);
+ m_pending_added_image_ids.erase(image_id);
+ m_pending_removed_image_ids.erase(image_id);
+ if (!updated_image.invalid) {
+ if (updated_image.enabled) {
+ m_pending_added_image_ids.insert(image_id);
+ } else {
+ m_pending_removed_image_ids.insert(image_id);
+ }
+ schedule_listener();
+ }
+ } else if (r == -EBLACKLISTED) {
+ dout(0) << "detected client is blacklisted" << dendl;
+
+ m_blacklisted = true;
+ } else if (r == -ENOENT) {
+ dout(10) << "image removed after add notification" << dendl;
+ } else {
+ derr << "error resolving image name " << updated_image.remote_image_id
+ << " (" << updated_image.global_image_id << "): " << cpp_strerror(r)
+ << dendl;
+ image_ids_invalid = true;
+ }
+
+ if (!image_ids_invalid) {
+ schedule_get_image_name();
}
- for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
- std::string image_name;
- auto it2 = image_id_to_name.find(it->first);
- if (it2 != image_id_to_name.end()) {
- image_name = it2->second;
+ }
+
+ if (image_ids_invalid) {
+ schedule_refresh_images(5);
+ }
+ m_async_op_tracker.finish_op();
+}
+
+template <typename I>
+void PoolWatcher<I>::processs_refresh_images() {
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_timer_ctx != nullptr);
+ m_timer_ctx = nullptr;
+
+ // execute outside of the timer's lock
+ m_async_op_tracker.start_op();
+ Context *ctx = new FunctionContext([this](int r) {
+ register_watcher();
+ m_async_op_tracker.finish_op();
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void PoolWatcher<I>::schedule_listener() {
+ assert(m_lock.is_locked());
+ m_pending_updates = true;
+ if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) {
+ return;
+ }
+
+ dout(20) << dendl;
+
+ m_async_op_tracker.start_op();
+ Context *ctx = new FunctionContext([this](int r) {
+ notify_listener();
+ m_async_op_tracker.finish_op();
+ });
+
+ m_notify_listener_in_progress = true;
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void PoolWatcher<I>::notify_listener() {
+ dout(10) << dendl;
+
+ ImageIds added_image_ids;
+ ImageIds removed_image_ids;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_notify_listener_in_progress);
+
+ // if the watch failed while we didn't own the lock, we are going
+ // to need to perform a full refresh
+ if (m_image_ids_invalid) {
+ m_notify_listener_in_progress = false;
+ return;
+ }
+
+ // merge add/remove notifications into pending set (a given image
+ // can only be in one set or another)
+ for (auto it = m_pending_removed_image_ids.begin();
+ it != m_pending_removed_image_ids.end(); ) {
+ if (std::find_if(m_updated_images.begin(), m_updated_images.end(),
+ [&it](const UpdatedImage &updated_image) {
+ return (it->id == updated_image.remote_image_id);
+ }) != m_updated_images.end()) {
+ // still resolving the name -- so keep it in the pending set
+ auto image_id_it = m_image_ids.find(*it);
+ if (image_id_it != m_image_ids.end()) {
+ m_pending_image_ids.insert(*image_id_it);
+ }
+ ++it;
+ continue;
+ }
+
+ // merge the remove event into the pending set
+ m_pending_image_ids.erase(*it);
+ it = m_pending_removed_image_ids.erase(it);
+ }
+
+ for (auto &image_id : m_pending_added_image_ids) {
+ dout(20) << "image_id=" << image_id << dendl;
+ m_pending_image_ids.erase(image_id);
+ m_pending_image_ids.insert(image_id);
+ }
+ m_pending_added_image_ids.clear();
+
+ // compute added/removed images
+ for (auto &image_id : m_image_ids) {
+ auto it = m_pending_image_ids.find(image_id);
+ if (it == m_pending_image_ids.end() || it->id != image_id.id) {
+ removed_image_ids.insert(image_id);
}
- image_ids->insert(ImageId(it->second, it->first, image_name));
}
- if (!mirror_images.empty()) {
- last_read = mirror_images.rbegin()->first;
+ for (auto &image_id : m_pending_image_ids) {
+ auto it = m_image_ids.find(image_id);
+ if (it == m_image_ids.end() || it->id != image_id.id) {
+ added_image_ids.insert(image_id);
+ }
}
- r = mirror_images.size();
- } while (r == max_read);
- return 0;
+ m_pending_updates = false;
+ m_image_ids = m_pending_image_ids;
+ }
+
+ m_listener.handle_update(added_image_ids, removed_image_ids);
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_notify_listener_in_progress = false;
+ if (m_pending_updates) {
+ schedule_listener();
+ }
+ }
}
} // namespace mirror
} // namespace rbd
+
+template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;
#include <set>
#include <string>
+#include "common/AsyncOpTracker.h"
#include "common/ceph_context.h"
#include "common/Mutex.h"
-#include "common/Timer.h"
#include "include/rados/librados.hpp"
#include "types.h"
+#include <list>
+#include <unordered_map>
+#include <boost/functional/hash.hpp>
#include <boost/optional.hpp>
+#include "include/assert.h"
+
+namespace librbd { struct ImageCtx; }
namespace rbd {
namespace mirror {
+template <typename> struct Threads;
+
/**
* Keeps track of images that have mirroring enabled within all
* pools.
*/
+template <typename ImageCtxT = librbd::ImageCtx>
class PoolWatcher {
public:
- PoolWatcher(librados::IoCtx &remote_io_ctx, double interval_seconds,
- Mutex &lock, Cond &cond);
+ struct Listener {
+ virtual ~Listener() {
+ }
+
+ virtual void handle_update(const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids) = 0;
+ };
+
+ PoolWatcher(Threads<ImageCtxT> *threads, librados::IoCtx &remote_io_ctx,
+ Listener &listener);
~PoolWatcher();
PoolWatcher(const PoolWatcher&) = delete;
PoolWatcher& operator=(const PoolWatcher&) = delete;
bool is_blacklisted() const;
- const ImageIds& get_images() const;
- void refresh_images(bool reschedule=true);
+ void init(Context *on_finish = nullptr);
+ void shut_down(Context *on_finish);
private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * INIT
+ * |
+ * v
+ * REGISTER_WATCHER
+ * |
+ * |/--------------------------------\
+ * | |
+ * v |
+ * REFRESH_IMAGES |
+ * | |
+ * |/----------------------------\ |
+ * | | |
+ * v | |
+ * NOTIFY_LISTENER | |
+ * | | |
+ * v | |
+ * IDLE ---\ | |
+ * | | | |
+ * | |\---> IMAGE_UPDATED | |
+ * | | | | |
+ * | | v | |
+ * | | GET_IMAGE_NAME --/ |
+ * | | |
+ * | \----> WATCH_ERROR ---------/
+ * v
+ * SHUT_DOWN
+ * |
+ * v
+ * UNREGISTER_WATCHER
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+ class MirroringWatcher;
+
+ struct UpdatedImage {
+ std::string global_image_id;
+ std::string remote_image_id;
+ bool enabled = true;
+ bool invalid = false;
+
+ UpdatedImage(const std::string &global_image_id,
+ const std::string &remote_image_id)
+ : global_image_id(global_image_id), remote_image_id(remote_image_id) {
+ }
+ };
+
+ typedef std::pair<std::string, std::string> GlobalRemoteIds;
+ typedef std::list<UpdatedImage> UpdatedImages;
+ typedef std::unordered_map<GlobalRemoteIds, typename UpdatedImages::iterator,
+ boost::hash<GlobalRemoteIds> > IdToUpdatedImages;
+
+ struct StrictImageIdCompare {
+ bool operator()(const ImageId &lhs, const ImageId &rhs) const {
+ if (lhs.global_id != rhs.global_id) {
+ return lhs.global_id < rhs.global_id;
+ }
+ return lhs.id < rhs.id;
+ }
+ };
+ Threads<ImageCtxT> *m_threads;
librados::IoCtx m_remote_io_ctx;
- Mutex &m_lock;
- Cond &m_refresh_cond;
+ Listener &m_listener;
+
+ ImageIds m_refresh_image_ids;
+ bufferlist m_out_bl;
+
+ mutable Mutex m_lock;
+
+ Context *m_on_init_finish = nullptr;
- bool m_stopping = false;
+ ImageIds m_image_ids;
+
+ bool m_pending_updates = false;
+ bool m_notify_listener_in_progress = false;
+ ImageIds m_pending_image_ids;
+ ImageIds m_pending_added_image_ids;
+ ImageIds m_pending_removed_image_ids;
+
+ MirroringWatcher *m_mirroring_watcher;
+
+ Context *m_timer_ctx = nullptr;
+
+ AsyncOpTracker m_async_op_tracker;
bool m_blacklisted = false;
- SafeTimer m_timer;
- double m_interval;
+ bool m_shutting_down = false;
+ bool m_image_ids_invalid = true;
+ bool m_refresh_in_progress = false;
+
+ UpdatedImages m_updated_images;
+ IdToUpdatedImages m_id_to_updated_images;
+ bool m_get_name_in_progress = false;
+
+ void register_watcher();
+ void handle_register_watcher(int r);
+ void unregister_watcher();
- ImageIds m_images;
+ void refresh_images();
+ void handle_refresh_images(int r);
+
+ void schedule_refresh_images(double interval);
+ void processs_refresh_images();
+
+ void handle_rewatch_complete(int r);
+ void handle_image_updated(const std::string &remote_image_id,
+ const std::string &global_image_id,
+ bool enabled);
+
+ void schedule_get_image_name();
+ void handle_get_image_name(int r);
+
+ void schedule_listener();
+ void notify_listener();
- int refresh(ImageIds *image_ids);
};
} // namespace mirror
} // namespace rbd
+extern template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;
+
#endif // CEPH_RBD_MIRROR_POOL_WATCHER_H
m_peer(peer),
m_args(args),
m_local_pool_id(local_pool_id),
+ m_pool_watcher_listener(this),
m_asok_hook(nullptr),
m_replayer_thread(this),
m_leader_listener(this)
if (m_instance_watcher) {
m_instance_watcher->shut_down();
}
+
+ assert(!m_pool_watcher);
}
bool Replayer::is_blacklisted() const {
dout(20) << "connected to " << m_peer << dendl;
+ r = init_local_mirroring_images();
+ if (r < 0) {
+ return r;
+ }
+
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
r = m_leader_watcher->init();
return r;
}
- // Bootstrap existing mirroring images
- init_local_mirroring_images();
-
- m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx,
- g_ceph_context->_conf->rbd_mirror_image_directory_refresh_interval,
- m_lock, m_cond));
- m_pool_watcher->refresh_images();
-
m_replayer_thread.create("replayer");
return 0;
return 0;
}
-void Replayer::init_local_mirroring_images() {
+int Replayer::init_local_mirroring_images() {
+ dout(20) << dendl;
+
rbd_mirror_mode_t mirror_mode;
int r = librbd::api::Mirror<>::mode_get(m_local_io_ctx, &mirror_mode);
if (r < 0) {
derr << "could not tell whether mirroring was enabled for "
<< m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
- return;
+ return r;
}
if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
dout(20) << "pool " << m_local_io_ctx.get_pool_name() << " "
<< "has mirroring disabled" << dendl;
- return;
+ return -ENOENT;
}
ImageIds image_ids;
if (r < 0) {
derr << "error listing mirrored image directory: "
<< cpp_strerror(r) << dendl;
- continue;
+ return r;
}
for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
std::string image_name;
r = dir_get_name(&m_local_io_ctx, RBD_DIRECTORY, it->first, &image_name);
- if (r < 0) {
+ if (r == -ENOENT) {
+ dout(20) << "orphaned mirror image: " << it->first << dendl;
+ continue;
+ } else if (r < 0) {
derr << "error retrieving local image name: " << cpp_strerror(r)
<< dendl;
- continue;
+ return r;
}
+
+ dout(20) << "local image: " << it->second << " (" << it->first << ")"
+ << dendl;
image_ids.insert(ImageId(it->second, it->first, image_name));
}
if (!mirror_images.empty()) {
} while (r == max_read);
m_init_image_ids = std::move(image_ids);
+ return 0;
}
void Replayer::run()
dout(20) << "enter" << dendl;
while (!m_stopping.read()) {
-
std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
m_peer.cluster_name;
if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
}
Mutex::Locker locker(m_lock);
- if (m_pool_watcher->is_blacklisted()) {
+ if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
m_blacklisted = true;
m_stopping.set(1);
- } else if (!m_manual_stop && m_leader_watcher->is_leader()) {
- set_sources(m_pool_watcher->get_images());
+ break;
}
- if (m_blacklisted) {
- break;
+ for (auto image_it = m_image_replayers.begin();
+ image_it != m_image_replayers.end(); ) {
+ if (image_it->second->remote_images_empty()) {
+ if (stop_image_replayer(image_it->second)) {
+ image_it = m_image_replayers.erase(image_it);
+ continue;
+ }
+ } else {
+ start_image_replayer(image_it->second);
+ }
+ ++image_it;
}
+
m_cond.WaitInterval(m_lock,
- utime_t(g_ceph_context->_conf
- ->rbd_mirror_image_state_check_interval, 0));
+ utime_t(g_ceph_context->_conf->
+ rbd_mirror_image_state_check_interval, 0));
}
- ImageIds empty_sources;
- while (true) {
- Mutex::Locker locker(m_lock);
- set_sources(empty_sources);
- if (m_image_replayers.empty()) {
- break;
- }
- m_cond.WaitInterval(m_lock, seconds(1));
+ Mutex::Locker locker(m_lock);
+ while (!m_image_replayers.empty()) {
+ stop_image_replayers();
}
}
m_leader_watcher->release_leader();
}
-void Replayer::set_sources(const ImageIds &image_ids)
-{
- dout(20) << "enter" << dendl;
+void Replayer::handle_update(const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids) {
+ if (m_stopping.read()) {
+ return;
+ }
- assert(m_lock.is_locked());
+ dout(10) << dendl;
+ Mutex::Locker locker(m_lock);
+ if (!m_leader_watcher->is_leader()) {
+ return;
+ }
- if (!m_init_image_ids.empty() && !m_stopping.read() &&
- m_leader_watcher->is_leader()) {
+ // first callback will be a full directory -- so see if we need to remove
+ // any local images that no longer exist on the remote side
+ if (!m_init_image_ids.empty()) {
dout(20) << "scanning initial local image set" << dendl;
- for (auto &remote_image : image_ids) {
- auto it = m_init_image_ids.find(ImageId(remote_image.global_id));
+ for (auto &image_id : added_image_ids) {
+ auto it = m_init_image_ids.find(image_id);
if (it != m_init_image_ids.end()) {
m_init_image_ids.erase(it);
}
}
// shut down replayers for non-mirrored images
- for (auto image_it = m_image_replayers.begin();
- image_it != m_image_replayers.end();) {
- auto image_id_it = image_ids.find(ImageId(image_it->first));
- if (image_id_it == image_ids.end()) {
+ for (auto &image_id : removed_image_ids) {
+ auto image_it = m_image_replayers.find(image_id.global_id);
+ if (image_it != m_image_replayers.end()) {
+ assert(!m_remote_mirror_uuid.empty());
+ image_it->second->remove_remote_image(m_remote_mirror_uuid,
+ image_id.id);
+
if (image_it->second->is_running()) {
dout(20) << "stop image replayer for remote image "
- << image_it->second->get_global_image_id() << dendl;
+ << image_id.id << " (" << image_id.global_id << ")"
+ << dendl;
}
- if (stop_image_replayer(image_it->second)) {
- image_it = m_image_replayers.erase(image_it);
- continue;
+
+ if (image_it->second->remote_images_empty() &&
+ stop_image_replayer(image_it->second)) {
+ // no additional remotes registered for this image
+ m_image_replayers.erase(image_it);
}
}
- ++image_it;
}
- if (image_ids.empty()) {
+ // prune previously stopped image replayers
+ for (auto image_it = m_image_replayers.begin();
+ image_it != m_image_replayers.end(); ) {
+ if (image_it->second->remote_images_empty() &&
+ stop_image_replayer(image_it->second)) {
+ image_it = m_image_replayers.erase(image_it);
+ } else {
+ ++image_it;
+ }
+ }
+
+ if (added_image_ids.empty()) {
return;
}
std::string local_mirror_uuid;
int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
&local_mirror_uuid);
- if (r < 0) {
+ if (r < 0 || local_mirror_uuid.empty()) {
derr << "failed to retrieve local mirror uuid from pool "
<< m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
return;
std::string remote_mirror_uuid;
r = librbd::cls_client::mirror_uuid_get(&m_remote_io_ctx,
&remote_mirror_uuid);
- if (r < 0) {
+ if (r < 0 || remote_mirror_uuid.empty()) {
derr << "failed to retrieve remote mirror uuid from pool "
<< m_remote_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
return;
}
+ m_remote_mirror_uuid = remote_mirror_uuid;
- for (auto &image_id : image_ids) {
+ // start replayers for newly added remote image sources
+ for (auto &image_id : added_image_ids) {
auto it = m_image_replayers.find(image_id.global_id);
if (it == m_image_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
local_mirror_uuid, m_local_pool_id, image_id.global_id));
+ if (m_manual_stop) {
+ image_replayer->stop(nullptr, true);
+ }
+
it = m_image_replayers.insert(
std::make_pair(image_id.global_id, std::move(image_replayer))).first;
}
void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
{
assert(m_lock.is_locked());
-
- std::string global_image_id = image_replayer->get_global_image_id();
- dout(20) << "global_image_id=" << global_image_id << dendl;
-
- if (!image_replayer->is_stopped()) {
+ if (!image_replayer->is_stopped() || image_replayer->remote_images_empty()) {
return;
} else if (image_replayer->is_blacklisted()) {
derr << "blacklisted detected during image replay" << dendl;
return;
}
+ std::string global_image_id = image_replayer->get_global_image_id();
+ dout(20) << "global_image_id=" << global_image_id << dendl;
+
FunctionContext *ctx = new FunctionContext(
[this, global_image_id] (int r) {
dout(20) << "image deleter result: r=" << r << ", "
return false;
}
-void Replayer::handle_post_acquire_leader(Context *on_finish) {
+void Replayer::stop_image_replayers() {
dout(20) << dendl;
- {
- Mutex::Locker locker(m_lock);
- m_cond.Signal();
+ assert(m_lock.is_locked());
+ for (auto image_it = m_image_replayers.begin();
+ image_it != m_image_replayers.end();) {
+ if (stop_image_replayer(image_it->second)) {
+ image_it = m_image_replayers.erase(image_it);
+ continue;
+ }
+ ++image_it;
}
-
- on_finish->complete(0);
}
-void Replayer::handle_pre_release_leader(Context *on_finish) {
+void Replayer::stop_image_replayers(Context *on_finish) {
dout(20) << dendl;
{
Mutex::Locker locker(m_lock);
- set_sources(ImageIds());
+ stop_image_replayers();
+
if (!m_image_replayers.empty()) {
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ assert(r == 0);
+ stop_image_replayers(on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
Mutex::Locker timer_locker(m_threads->timer_lock);
- Context *task = create_async_context_callback(
- m_threads->work_queue, new FunctionContext(
- [this, on_finish](int r) {
- handle_pre_release_leader(on_finish);
- }));
- m_threads->timer->add_event_after(1, task);
+ m_threads->timer->add_event_after(1, ctx);
return;
}
}
on_finish->complete(0);
}
+void Replayer::handle_post_acquire_leader(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(!m_pool_watcher);
+ m_pool_watcher.reset(new PoolWatcher<>(
+ m_threads, m_remote_io_ctx, m_pool_watcher_listener));
+ m_pool_watcher->init(create_async_context_callback(
+ m_threads->work_queue, on_finish));
+
+ m_cond.Signal();
+}
+
+void Replayer::handle_pre_release_leader(Context *on_finish) {
+ dout(20) << dendl;
+ shut_down_pool_watcher(on_finish);
+}
+
+void Replayer::shut_down_pool_watcher(Context *on_finish) {
+ dout(20) << dendl;
+
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_pool_watcher(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ Mutex::Locker locker(m_lock);
+ assert(m_pool_watcher);
+ m_pool_watcher->shut_down(ctx);
+}
+
+void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_pool_watcher);
+ m_pool_watcher.reset();
+ }
+
+ stop_image_replayers(on_finish);
+}
+
} // namespace mirror
} // namespace rbd
void release_leader();
private:
- void init_local_mirroring_images();
- void set_sources(const ImageIds &image_ids);
+ struct PoolWatcherListener : public PoolWatcher<>::Listener {
+ Replayer *replayer;
+
+ PoolWatcherListener(Replayer *replayer) : replayer(replayer) {
+ }
+
+ void handle_update(const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids) override {
+ replayer->handle_update(added_image_ids, removed_image_ids);
+ }
+ };
+
+ int init_local_mirroring_images();
+
+ void handle_update(const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids);
void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
+ void stop_image_replayers();
+ void stop_image_replayers(Context *on_finish);
- int init_rados(const std::string &cluster_name, const std::string &client_name,
+ int init_rados(const std::string &cluster_name,
+ const std::string &client_name,
const std::string &description, RadosRef *rados_ref);
void handle_post_acquire_leader(Context *on_finish);
void handle_pre_release_leader(Context *on_finish);
+ void shut_down_pool_watcher(Context *on_finish);
+ void handle_shut_down_pool_watcher(int r, Context *on_finish);
+
Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<> m_image_sync_throttler;
int64_t m_local_pool_id = -1;
int64_t m_remote_pool_id = -1;
- std::unique_ptr<PoolWatcher> m_pool_watcher;
+ std::string m_remote_mirror_uuid;
+
+ PoolWatcherListener m_pool_watcher_listener;
+ std::unique_ptr<PoolWatcher<> > m_pool_watcher;
+
std::map<std::string, std::unique_ptr<ImageReplayer<> > > m_image_replayers;
std::string m_asok_hook_name;
r = librbd::cls_client::mirror_image_list_finish(&it, &ids);
}
- if (r < 0) {
+ if (r < 0 && r != -ENOENT) {
derr << "failed to list mirrored images: " << cpp_strerror(r) << dendl;
finish(r);
return;
r = librbd::cls_client::dir_list_finish(&it, &name_to_ids);
}
- if (r < 0) {
+ if (r < 0 && r != -ENOENT) {
derr << "failed to list images: " << cpp_strerror(r) << dendl;
finish(r);
return;
template <typename ImageCtxT = librbd::ImageCtx>
class RefreshImagesRequest {
public:
- RefreshImagesRequest *create(librados::IoCtx &remote_io_ctx,
- ImageIds *image_ids, Context *on_finish) {
+ static RefreshImagesRequest *create(librados::IoCtx &remote_io_ctx,
+ ImageIds *image_ids, Context *on_finish) {
return new RefreshImagesRequest(remote_io_ctx, image_ids, on_finish);
}
namespace rbd {
namespace mirror {
-std::ostream& operator<<(std::ostream& lhs, const peer_t &peer)
-{
+std::ostream &operator<<(std::ostream &os, const ImageId &image_id) {
+ return os << "global id=" << image_id.global_id << ", "
+ << "id=" << image_id.id << ", "
+ << "name=" << image_id.name;
+}
+
+std::ostream& operator<<(std::ostream& lhs, const peer_t &peer) {
return lhs << "uuid: " << peer.uuid
<< " cluster: " << peer.cluster_name
<< " client: " << peer.client_name;
explicit ImageId(const std::string &global_id) : global_id(global_id) {
}
+ ImageId(const std::string &global_id, const std::string &id)
+ : global_id(global_id), id(id) {
+ }
ImageId(const std::string &global_id, const std::string &id,
- const boost::optional<std::string> &name = boost::none)
+ const std::string &name)
: global_id(global_id), id(id), name(name) {
}
}
};
+std::ostream &operator<<(std::ostream &, const ImageId &image_id);
+
typedef std::set<ImageId> ImageIds;
struct peer_t {