From: Mykola Golub Date: Sat, 11 Feb 2017 16:05:23 +0000 (+0100) Subject: rbd-mirror: class for tracking instances state X-Git-Tag: v12.0.1~198^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4c10945002c9af6699a28a646f0bfff2c2853cb7;p=ceph.git rbd-mirror: class for tracking instances state Signed-off-by: Mykola Golub --- diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index 369508eaf32b..08ca7fa7d5cf 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -5,6 +5,7 @@ set(rbd_mirror_test_srcs test_ImageDeleter.cc test_ImageSync.cc test_InstanceWatcher.cc + test_Instances.cc test_LeaderWatcher.cc test_fixture.cc ) diff --git a/src/test/rbd_mirror/test_Instances.cc b/src/test/rbd_mirror/test_Instances.cc new file mode 100644 index 000000000000..e2f8008e00ee --- /dev/null +++ b/src/test/rbd_mirror/test_Instances.cc @@ -0,0 +1,101 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/rados/librados.hpp" +#include "cls/rbd/cls_rbd_client.h" +#include "test/rbd_mirror/test_fixture.h" +#include "tools/rbd_mirror/InstanceWatcher.h" +#include "tools/rbd_mirror/Instances.h" +#include "tools/rbd_mirror/Threads.h" + +#include "test/librados/test.h" +#include "gtest/gtest.h" + +using rbd::mirror::InstanceWatcher; +using rbd::mirror::Instances; + +void register_test_instances() { +} + +class TestInstances : public ::rbd::mirror::TestFixture { +public: + virtual void SetUp() { + TestFixture::SetUp(); + m_local_io_ctx.remove(RBD_MIRROR_LEADER); + EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true)); + } +}; + +TEST_F(TestInstances, InitShutdown) +{ + Instances<> instances(m_threads, m_local_io_ctx); + + std::string instance_id = "instance_id"; + ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx, + instance_id)); + + C_SaferCond on_init; + instances.init(&on_init); + ASSERT_EQ(0, on_init.wait()); + + C_SaferCond on_shut_down; + instances.shut_down(&on_shut_down); + ASSERT_EQ(0, on_shut_down.wait()); +} + +TEST_F(TestInstances, InitEnoent) +{ + Instances<> instances(m_threads, m_local_io_ctx); + + m_local_io_ctx.remove(RBD_MIRROR_LEADER); + + C_SaferCond on_init; + instances.init(&on_init); + ASSERT_EQ(0, on_init.wait()); + + C_SaferCond on_shut_down; + instances.shut_down(&on_shut_down); + ASSERT_EQ(0, on_shut_down.wait()); +} + +TEST_F(TestInstances, NotifyRemove) +{ + // speed testing up a little + EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1")); + EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats", + "2")); + + Instances<> instances(m_threads, m_local_io_ctx); + + std::string instance_id1 = "instance_id1"; + std::string instance_id2 = "instance_id2"; + + ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx, + instance_id1)); + ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx, + instance_id2)); + + C_SaferCond on_init; + instances.init(&on_init); + ASSERT_EQ(0, on_init.wait()); + + std::vector instance_ids; + + for (int i = 0; i < 10; i++) { + instances.notify(instance_id1); + sleep(1); + C_SaferCond on_get; + InstanceWatcher<>::get_instances(m_local_io_ctx, &instance_ids, &on_get); + EXPECT_EQ(0, on_get.wait()); + if (instance_ids.size() <= 1U) { + break; + } + } + + ASSERT_EQ(1U, instance_ids.size()); + ASSERT_EQ(instance_ids[0], instance_id1); + + C_SaferCond on_shut_down; + instances.shut_down(&on_shut_down); + ASSERT_EQ(0, on_shut_down.wait()); +} diff --git a/src/test/rbd_mirror/test_main.cc b/src/test/rbd_mirror/test_main.cc index aae4b143787a..9e100b89a68b 100644 --- a/src/test/rbd_mirror/test_main.cc +++ b/src/test/rbd_mirror/test_main.cc @@ -11,6 +11,7 @@ extern void register_test_cluster_watcher(); extern void register_test_image_sync(); extern void register_test_instance_watcher(); +extern void register_test_instances(); extern void register_test_leader_watcher(); extern void register_test_pool_watcher(); extern void register_test_rbd_mirror(); @@ -21,6 +22,7 @@ int main(int argc, char **argv) register_test_cluster_watcher(); register_test_image_sync(); register_test_instance_watcher(); + register_test_instances(); register_test_leader_watcher(); register_test_pool_watcher(); register_test_rbd_mirror(); diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 9224199e5331..200fdee63e82 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -8,6 +8,7 @@ set(rbd_mirror_internal ImageSync.cc ImageSyncThrottler.cc InstanceWatcher.cc + Instances.cc LeaderWatcher.cc Mirror.cc MirrorStatusWatcher.cc diff --git a/src/tools/rbd_mirror/Instances.cc b/src/tools/rbd_mirror/Instances.cc new file mode 100644 index 000000000000..7e902303ad7c --- /dev/null +++ b/src/tools/rbd_mirror/Instances.cc @@ -0,0 +1,252 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/stringify.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "common/debug.h" +#include "common/errno.h" +#include "librbd/Utils.h" +#include "InstanceWatcher.h" +#include "Instances.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::Instances: " \ + << this << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; +using librbd::util::create_rados_ack_callback; + +template +Instances::Instances(Threads *threads, librados::IoCtx &ioctx) : + m_threads(threads), m_ioctx(ioctx), + m_cct(reinterpret_cast(ioctx.cct())), + m_lock("rbd::mirror::Instances " + ioctx.get_pool_name()) { +} + +template +Instances::~Instances() { +} + +template +void Instances::init(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + get_instances(); +} + +template +void Instances::shut_down(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + Context *ctx = new FunctionContext( + [this](int r) { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + for (auto it : m_instances) { + cancel_remove_task(it.second); + } + wait_for_ops(); + }); + + m_threads->work_queue->queue(ctx, 0); +} + +template +void Instances::notify(const std::string &instance_id) { + dout(20) << instance_id << dendl; + + Mutex::Locker locker(m_lock); + + if (m_on_finish != nullptr) { + dout(20) << "received on shut down, ignoring" << dendl; + return; + } + + Context *ctx = new C_Notify(this, instance_id); + + m_threads->work_queue->queue(ctx, 0); +} + +template +void Instances::handle_notify(const std::string &instance_id) { + dout(20) << instance_id << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + if (m_on_finish != nullptr) { + dout(20) << "handled on shut down, ignoring" << dendl; + return; + } + + auto &instance = m_instances.insert( + std::make_pair(instance_id, Instance(instance_id))).first->second; + + schedule_remove_task(instance); +} + +template +void Instances::list(std::vector *instance_ids) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + for (auto it : m_instances) { + instance_ids->push_back(it.first); + } +} + + +template +void Instances::get_instances() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_context_callback< + Instances, &Instances::handle_get_instances>(this); + + InstanceWatcher::get_instances(m_ioctx, &m_instance_ids, ctx); +} + +template +void Instances::handle_get_instances(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error retrieving instances: " << cpp_strerror(r) << dendl; + } else { + auto my_instance_id = stringify(m_ioctx.get_instance_id()); + for (auto &instance_id : m_instance_ids) { + if (instance_id == my_instance_id) { + continue; + } + auto &instance = m_instances.insert( + std::make_pair(instance_id, Instance(instance_id))).first->second; + schedule_remove_task(instance); + } + } + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void Instances::wait_for_ops() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Instances, &Instances::handle_wait_for_ops>(this)); + + m_async_op_tracker.wait_for_ops(ctx); +} + +template +void Instances::handle_wait_for_ops(int r) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void Instances::remove_instance(Instance &instance) { + assert(m_lock.is_locked()); + + dout(20) << instance.id << dendl; + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Instances, &Instances::handle_remove_instance>(this)); + + m_async_op_tracker.start_op(); + InstanceWatcher::remove_instance(m_ioctx, m_threads->work_queue, + instance.id, ctx); + m_instances.erase(instance.id); +} + +template +void Instances::handle_remove_instance(int r) { + Mutex::Locker locker(m_lock); + + dout(20) << " r=" << r << dendl; + + assert(r == 0); + + m_async_op_tracker.finish_op(); +} + +template +void Instances::cancel_remove_task(Instance &instance) { + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + if (instance.timer_task == nullptr) { + return; + } + + dout(20) << instance.timer_task << dendl; + + bool canceled = m_threads->timer->cancel_event(instance.timer_task); + assert(canceled); + instance.timer_task = nullptr; +} + +template +void Instances::schedule_remove_task(Instance &instance) { + dout(20) << dendl; + + cancel_remove_task(instance); + + int after = max(1, m_cct->_conf->rbd_mirror_leader_heartbeat_interval) * + (1 + m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats + + m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break); + + instance.timer_task = new FunctionContext( + [this, &instance](int r) { + assert(m_threads->timer_lock.is_locked()); + Mutex::Locker locker(m_lock); + instance.timer_task = nullptr; + remove_instance(instance); + }); + + dout(20) << "scheduling instance " << instance.id << " remove after " << after + << " sec (task " << instance.timer_task << ")" << dendl; + + m_threads->timer->add_event_after(after, instance.timer_task); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::Instances; diff --git a/src/tools/rbd_mirror/Instances.h b/src/tools/rbd_mirror/Instances.h new file mode 100644 index 000000000000..e8a4c25200d0 --- /dev/null +++ b/src/tools/rbd_mirror/Instances.h @@ -0,0 +1,112 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_INSTANCES_H +#define CEPH_RBD_MIRROR_INSTANCES_H + +#include +#include + +#include "include/buffer.h" +#include "common/AsyncOpTracker.h" +#include "common/Mutex.h" +#include "librbd/Watcher.h" + +namespace librados { class IoCtx; } +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +struct Threads; + +template +class Instances { +public: + static Instances *create(Threads *threads, librados::IoCtx &ioctx) { + return new Instances(threads, ioctx); + } + void destroy() { + delete this; + } + + Instances(Threads *threads, librados::IoCtx &ioctx); + virtual ~Instances(); + + void init(Context *on_finish); + void shut_down(Context *on_finish); + + void notify(const std::string &instance_id); + void list(std::vector *instance_ids); + +private: + /** + * @verbatim + * + * <---------------------\ + * | (init) ^ | + * v (error) * | + * GET_INSTANCES * * * * * WAIT_FOR_OPS + * | ^ + * v (shut_down) | + * ------------------------/ + * . + * . (remove_instance) + * v + * REMOVE_INSTANCE + * + * @endverbatim + */ + + struct Instance { + std::string id; + Context *timer_task = nullptr; + + Instance(const std::string &instance_id) : id(instance_id) { + } + }; + + struct C_Notify : Context { + Instances *instances; + std::string instance_id; + + C_Notify(Instances *instances, const std::string &instance_id) + : instances(instances), instance_id(instance_id) { + instances->m_async_op_tracker.start_op(); + } + + void finish(int r) override { + instances->handle_notify(instance_id); + instances->m_async_op_tracker.finish_op(); + } + }; + + Threads *m_threads; + librados::IoCtx &m_ioctx; + CephContext *m_cct; + + Mutex m_lock; + std::vector m_instance_ids; + std::map m_instances; + Context *m_on_finish = nullptr; + AsyncOpTracker m_async_op_tracker; + + void handle_notify(const std::string &instance_id); + + void get_instances(); + void handle_get_instances(int r); + + void wait_for_ops(); + void handle_wait_for_ops(int r); + + void remove_instance(Instance &instance); + void handle_remove_instance(int r); + + void cancel_remove_task(Instance &instance); + void schedule_remove_task(Instance &instance); +}; + +} // namespace mirror +} // namespace rbd + +#endif // CEPH_RBD_MIRROR_INSTANCES_H