#include "test/librados/test.h"
#include "gtest/gtest.h"
+#include <vector>
using rbd::mirror::InstanceWatcher;
using rbd::mirror::Instances;
class TestInstances : public ::rbd::mirror::TestFixture {
public:
+ struct Listener : public rbd::mirror::instances::Listener {
+ std::mutex lock;
+
+ struct Instance {
+ uint32_t count = 0;
+ std::set<std::string> ids;
+ C_SaferCond ctx;
+ };
+
+ Instance add;
+ Instance remove;
+
+ void handle(const InstanceIds& instance_ids, Instance* instance) {
+ std::unique_lock<std::mutex> locker(lock);
+ for (auto& instance_id : instance_ids) {
+ assert(instance->count > 0);
+ --instance->count;
+
+ instance->ids.insert(instance_id);
+ if (instance->count == 0) {
+ instance->ctx.complete(0);
+ }
+ }
+ }
+
+ void handle_added(const InstanceIds& instance_ids) override {
+ handle(instance_ids, &add);
+ }
+
+ void handle_removed(const InstanceIds& instance_ids) override {
+ handle(instance_ids, &remove);
+ }
+ };
+
virtual void SetUp() {
TestFixture::SetUp();
m_local_io_ctx.remove(RBD_MIRROR_LEADER);
EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
}
+
+ Listener m_listener;
};
TEST_F(TestInstances, InitShutdown)
{
- Instances<> instances(m_threads, m_local_io_ctx);
+ m_listener.add.count = 1;
+ Instances<> instances(m_threads, m_local_io_ctx, m_listener);
std::string instance_id = "instance_id";
ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
instances.init(&on_init);
ASSERT_EQ(0, on_init.wait());
+ ASSERT_EQ(0, m_listener.add.ctx.wait());
+ ASSERT_EQ(std::set<std::string>({instance_id}), m_listener.add.ids);
+
C_SaferCond on_shut_down;
instances.shut_down(&on_shut_down);
ASSERT_EQ(0, on_shut_down.wait());
TEST_F(TestInstances, InitEnoent)
{
- Instances<> instances(m_threads, m_local_io_ctx);
+ Instances<> instances(m_threads, m_local_io_ctx, m_listener);
m_local_io_ctx.remove(RBD_MIRROR_LEADER);
EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1"));
EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats",
"2"));
+ EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_acquire_attempts_before_break",
+ "0"));
- Instances<> instances(m_threads, m_local_io_ctx);
+ m_listener.add.count = 2;
+ m_listener.remove.count = 1;
+ Instances<> instances(m_threads, m_local_io_ctx, m_listener);
std::string instance_id1 = "instance_id1";
std::string instance_id2 = "instance_id2";
ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
instance_id1));
- ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
- instance_id2));
+
C_SaferCond on_init;
instances.init(&on_init);
ASSERT_EQ(0, on_init.wait());
- std::vector<std::string> instance_ids;
+ instances.acked({instance_id1, instance_id2});
- for (int i = 0; i < 10; i++) {
+ ASSERT_EQ(0, m_listener.add.ctx.wait());
+ ASSERT_EQ(std::set<std::string>({instance_id1, instance_id2}),
+ m_listener.add.ids);
+
+ std::vector<std::string> instance_ids;
+ for (int i = 0; i < 100; i++) {
instances.acked({instance_id1});
- sleep(1);
- C_SaferCond on_get;
- InstanceWatcher<>::get_instances(m_local_io_ctx, &instance_ids, &on_get);
- EXPECT_EQ(0, on_get.wait());
- if (instance_ids.size() <= 1U) {
- break;
+ if (m_listener.remove.count > 0) {
+ usleep(250000);
}
}
- ASSERT_EQ(1U, instance_ids.size());
+ instances.acked({instance_id1});
+ ASSERT_EQ(0, m_listener.remove.ctx.wait());
+ ASSERT_EQ(std::set<std::string>({instance_id2}),
+ m_listener.remove.ids);
+
+ C_SaferCond on_get;
+ instances.acked({instance_id1});
+ InstanceWatcher<>::get_instances(m_local_io_ctx, &instance_ids, &on_get);
+ EXPECT_EQ(0, on_get.wait());
+ EXPECT_EQ(1U, instance_ids.size());
ASSERT_EQ(instance_ids[0], instance_id1);
C_SaferCond on_shut_down;
static Instances* s_instance;
static Instances *create(Threads<librbd::MockTestImageCtx> *threads,
- librados::IoCtx &ioctx) {
+ librados::IoCtx &ioctx, instances::Listener&) {
assert(s_instance != nullptr);
return s_instance;
}
using librbd::util::create_rados_callback;
template <typename I>
-Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx) :
- m_threads(threads), m_ioctx(ioctx),
+Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx,
+ instances::Listener& listener) :
+ m_threads(threads), m_ioctx(ioctx), m_listener(listener),
m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
m_lock("rbd::mirror::Instances " + ioctx.get_pool_name()) {
}
return;
}
+ InstanceIds added_instance_ids;
auto time = ceph_clock_now();
for (auto& instance_id : instance_ids) {
auto &instance = m_instances.insert(
std::make_pair(instance_id, Instance{})).first->second;
instance.acked_time = time;
+ if (instance.state == INSTANCE_STATE_ADDING) {
+ added_instance_ids.push_back(instance_id);
+ }
}
schedule_remove_task(time);
+ if (!added_instance_ids.empty()) {
+ m_threads->work_queue->queue(
+ new C_NotifyInstancesAdded(this, added_instance_ids), 0);
+ }
+}
+
+template <typename I>
+void Instances<I>::notify_instances_added(const InstanceIds& instance_ids) {
+ Mutex::Locker locker(m_lock);
+ InstanceIds added_instance_ids;
+ for (auto& instance_id : instance_ids) {
+ auto it = m_instances.find(instance_id);
+ if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
+ added_instance_ids.push_back(instance_id);
+ }
+ }
+
+ if (added_instance_ids.empty()) {
+ return;
+ }
+
+ dout(5) << "instance_ids=" << added_instance_ids << dendl;
+ m_lock.Unlock();
+ m_listener.handle_added(added_instance_ids);
+ m_lock.Lock();
+
+ for (auto& instance_id : added_instance_ids) {
+ auto it = m_instances.find(instance_id);
+ if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
+ it->second.state = INSTANCE_STATE_IDLE;
+ }
+ }
+}
+
+template <typename I>
+void Instances<I>::notify_instances_removed(const InstanceIds& instance_ids) {
+ dout(5) << "instance_ids=" << instance_ids << dendl;
+ m_listener.handle_removed(instance_ids);
+
+ Mutex::Locker locker(m_lock);
+ for (auto& instance_id : instance_ids) {
+ m_instances.erase(instance_id);
+ }
}
template <typename I>
instance_ids.push_back(instance_pair.first);
}
}
+ assert(!instance_ids.empty());
dout(20) << "instance_ids=" << instance_ids << dendl;
Context* ctx = new FunctionContext([this, instance_ids](int r) {
dout(20) << "r=" << r << ", instance_ids=" << instance_ids << dendl;
assert(r == 0);
- for (auto& instance_id : instance_ids) {
- m_instances.erase(instance_id);
- }
+ // fire removed notification now that instaces have been blacklisted
+ m_threads->work_queue->queue(
+ new C_NotifyInstancesRemoved(this, instance_ids), 0);
// reschedule the timer for the next batch
schedule_remove_task(ceph_clock_now());
return;
}
- dout(20) << dendl;
int after = m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_heartbeat_interval") *
(1 + m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_missed_heartbeats") +
m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_acquire_attempts_before_break"));
utime_t oldest_time = time;
for (auto& instance : m_instances) {
if (instance.second.state == INSTANCE_STATE_REMOVING) {
+ // removal is already in-flight
continue;
}
return;
}
+ dout(20) << dendl;
+
// schedule a time to fire when the oldest instance should be removed
m_timer_task = new FunctionContext(
[this, oldest_time](int r) {
#include "common/AsyncOpTracker.h"
#include "common/Mutex.h"
#include "librbd/Watcher.h"
+#include "tools/rbd_mirror/instances/Types.h"
namespace librados { class IoCtx; }
namespace librbd { class ImageCtx; }
typedef std::vector<std::string> InstanceIds;
static Instances *create(Threads<ImageCtxT> *threads,
- librados::IoCtx &ioctx) {
- return new Instances(threads, ioctx);
+ librados::IoCtx &ioctx,
+ instances::Listener& listener) {
+ return new Instances(threads, ioctx, listener);
}
void destroy() {
delete this;
}
- Instances(Threads<ImageCtxT> *threads, librados::IoCtx &ioctx);
+ Instances(Threads<ImageCtxT> *threads, librados::IoCtx &ioctx,
+ instances::Listener& listener);
virtual ~Instances();
void init(Context *on_finish);
*/
enum InstanceState {
+ INSTANCE_STATE_ADDING,
INSTANCE_STATE_IDLE,
INSTANCE_STATE_REMOVING
};
struct Instance {
utime_t acked_time{};
- InstanceState state = INSTANCE_STATE_IDLE;
+ InstanceState state = INSTANCE_STATE_ADDING;
};
struct C_NotifyBase : public Context {
}
};
+ struct C_NotifyInstancesAdded : public C_NotifyBase {
+ C_NotifyInstancesAdded(Instances *instances,
+ const InstanceIds& instance_ids)
+ : C_NotifyBase(instances, instance_ids) {
+ }
+
+ void execute() override {
+ this->instances->notify_instances_added(this->instance_ids);
+ }
+ };
+
+ struct C_NotifyInstancesRemoved : public C_NotifyBase {
+ C_NotifyInstancesRemoved(Instances *instances,
+ const InstanceIds& instance_ids)
+ : C_NotifyBase(instances, instance_ids) {
+ }
+
+ void execute() override {
+ this->instances->notify_instances_removed(this->instance_ids);
+ }
+ };
+
Threads<ImageCtxT> *m_threads;
librados::IoCtx &m_ioctx;
+ instances::Listener& m_listener;
CephContext *m_cct;
Mutex m_lock;
Context *m_timer_task = nullptr;
void handle_acked(const InstanceIds& instance_ids);
+ void notify_instances_added(const InstanceIds& instance_ids);
+ void notify_instances_removed(const InstanceIds& instance_ids);
void get_instances();
void handle_get_instances(int r);
LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
leader_watcher::Listener *listener)
: Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
- m_threads(threads), m_listener(listener),
+ m_threads(threads), m_listener(listener), m_instances_listener(this),
m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
assert(m_lock.is_locked());
assert(m_instances == nullptr);
- m_instances = Instances<I>::create(m_threads, m_ioctx);
+ m_instances = Instances<I>::create(m_threads, m_ioctx, m_instances_listener);
Context *ctx = create_context_callback<
LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
#include "librbd/watcher/Types.h"
#include "Instances.h"
#include "MirrorStatusWatcher.h"
+#include "tools/rbd_mirror/instances/Types.h"
#include "tools/rbd_mirror/leader_watcher/Types.h"
namespace librbd { class ImageCtx; }
* @endverbatim
*/
+ struct InstancesListener : public instances::Listener {
+ LeaderWatcher* leader_watcher;
+
+ InstancesListener(LeaderWatcher* leader_watcher)
+ : leader_watcher(leader_watcher) {
+ }
+
+ void handle_added(const InstanceIds& instance_ids) override {
+ // TODO
+ }
+
+ void handle_removed(const InstanceIds& instance_ids) override {
+ // TODO
+ }
+ };
+
class LeaderLock : public librbd::ManagedLock<ImageCtxT> {
public:
typedef librbd::ManagedLock<ImageCtxT> Parent;
Threads<ImageCtxT> *m_threads;
leader_watcher::Listener *m_listener;
+ InstancesListener m_instances_listener;
mutable Mutex m_lock;
uint64_t m_notifier_id;
LeaderLock *m_leader_lock;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_INSTANCES_TYPES_H
+#define CEPH_RBD_MIRROR_INSTANCES_TYPES_H
+
+#include <string>
+#include <vector>
+
+namespace rbd {
+namespace mirror {
+namespace instances {
+
+struct Listener {
+ typedef std::vector<std::string> InstanceIds;
+
+ virtual ~Listener() {
+ }
+
+ virtual void handle_added(const InstanceIds& instance_ids) = 0;
+ virtual void handle_removed(const InstanceIds& instance_ids) = 0;
+};
+
+} // namespace instances
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_INSTANCES_TYPES_H