TestFixture::SetUp();
m_local_io_ctx.remove(RBD_MIRROR_LEADER);
EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
+
+ m_instance_id = stringify(m_local_io_ctx.get_instance_id());
}
Listener m_listener;
+ std::string m_instance_id;
};
TEST_F(TestInstances, InitShutdown)
{
m_listener.add.count = 1;
- Instances<> instances(m_threads, m_local_io_ctx, m_listener);
+ Instances<> instances(m_threads, m_local_io_ctx, m_instance_id, m_listener);
std::string instance_id = "instance_id";
ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
TEST_F(TestInstances, InitEnoent)
{
- Instances<> instances(m_threads, m_local_io_ctx, m_listener);
+ Instances<> instances(m_threads, m_local_io_ctx, m_instance_id, m_listener);
m_local_io_ctx.remove(RBD_MIRROR_LEADER);
m_listener.add.count = 2;
m_listener.remove.count = 1;
- Instances<> instances(m_threads, m_local_io_ctx, m_listener);
+ Instances<> instances(m_threads, m_local_io_ctx, m_instance_id, m_listener);
std::string instance_id1 = "instance_id1";
std::string instance_id2 = "instance_id2";
static Instances* s_instance;
static Instances *create(Threads<librbd::MockTestImageCtx> *threads,
- librados::IoCtx &ioctx, instances::Listener&) {
+ librados::IoCtx &ioctx,
+ const std::string& instance_id,
+ instances::Listener&) {
assert(s_instance != nullptr);
return s_instance;
}
EXPECT_CALL(mock_instances, unblock_listener());
}
+ void expect_instances_acked(MockInstances& mock_instances) {
+ EXPECT_CALL(mock_instances, acked(_));
+ }
+
MockThreads *m_mock_threads;
};
expect_acquire_notify(mock_managed_lock, listener, 0);
expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+ expect_instances_acked(mock_instances);
ASSERT_EQ(0, leader_watcher.init());
ASSERT_EQ(0, on_heartbeat_finish.wait());
expect_acquire_notify(mock_managed_lock, listener, 0);
expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+ expect_instances_acked(mock_instances);
ASSERT_EQ(0, leader_watcher.init());
ASSERT_EQ(0, on_heartbeat_finish.wait());
expect_acquire_notify(mock_managed_lock, listener, 0);
expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+ expect_instances_acked(mock_instances);
ASSERT_EQ(0, leader_watcher.init());
ASSERT_EQ(0, on_heartbeat_finish.wait());
expect_acquire_notify(mock_managed_lock, listener, 0);
expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+ expect_instances_acked(mock_instances);
ASSERT_EQ(0, leader_watcher.init());
ASSERT_EQ(0, on_heartbeat_finish.wait());
template <typename I>
Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx,
+ const std::string& instance_id,
instances::Listener& listener) :
- m_threads(threads), m_ioctx(ioctx), m_listener(listener),
- m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
+ m_threads(threads), m_ioctx(ioctx), m_instance_id(instance_id),
+ m_listener(listener), m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
m_lock("rbd::mirror::Instances " + ioctx.get_pool_name()) {
}
InstanceIds instance_ids;
for (auto& instance_pair : m_instances) {
+ if (instance_pair.first == m_instance_id) {
+ continue;
+ }
auto& instance = instance_pair.second;
if (instance.state != INSTANCE_STATE_REMOVING &&
instance.acked_time <= time) {
bool schedule = false;
utime_t oldest_time = time;
for (auto& instance : m_instances) {
+ if (instance.first == m_instance_id) {
+ continue;
+ }
if (instance.second.state == INSTANCE_STATE_REMOVING) {
// removal is already in-flight
continue;
static Instances *create(Threads<ImageCtxT> *threads,
librados::IoCtx &ioctx,
+ const std::string& instance_id,
instances::Listener& listener) {
- return new Instances(threads, ioctx, listener);
+ return new Instances(threads, ioctx, instance_id, listener);
}
void destroy() {
delete this;
}
Instances(Threads<ImageCtxT> *threads, librados::IoCtx &ioctx,
- instances::Listener& listener);
+ const std::string& instance_id, instances::Listener& listener);
virtual ~Instances();
void init(Context *on_finish);
Threads<ImageCtxT> *m_threads;
librados::IoCtx &m_ioctx;
+ std::string m_instance_id;
instances::Listener& m_listener;
CephContext *m_cct;
m_threads(threads), m_listener(listener), m_instances_listener(this),
m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
+ m_instance_id(stringify(m_notifier_id)),
m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
m_cct->_conf->get_val<int64_t>(
"rbd_blacklist_expire_seconds"))) {
template <typename I>
std::string LeaderWatcher<I>::get_instance_id() {
- return stringify(m_notifier_id);
+ return m_instance_id;
}
template <typename I>
Mutex::Locker locker(m_lock);
if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
- *instance_id = stringify(m_notifier_id);
+ *instance_id = m_instance_id;
return true;
}
assert(m_lock.is_locked());
assert(m_instances == nullptr);
- m_instances = Instances<I>::create(m_threads, m_ioctx, m_instances_listener);
+ m_instances = Instances<I>::create(m_threads, m_ioctx, m_instance_id,
+ m_instances_listener);
Context *ctx = create_context_callback<
LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
std::vector<std::string> instance_ids;
for (auto &it: m_heartbeat_response.acks) {
uint64_t notifier_id = it.first.gid;
- if (notifier_id == m_notifier_id) {
- continue;
- }
-
instance_ids.push_back(stringify(notifier_id));
}
if (!instance_ids.empty()) {
InstancesListener m_instances_listener;
mutable Mutex m_lock;
uint64_t m_notifier_id;
+ std::string m_instance_id;
LeaderLock *m_leader_lock;
Context *m_on_finish = nullptr;
Context *m_on_shut_down_finish = nullptr;