instances.init(&on_init);
ASSERT_EQ(0, on_init.wait());
+ ASSERT_LT(0U, m_listener.add.count);
+ instances.unblock_listener();
+
ASSERT_EQ(0, m_listener.add.ctx.wait());
ASSERT_EQ(std::set<std::string>({instance_id}), m_listener.add.ids);
instances.acked({instance_id1, instance_id2});
+ ASSERT_LT(0U, m_listener.add.count);
+ instances.unblock_listener();
+
ASSERT_EQ(0, m_listener.add.ctx.wait());
ASSERT_EQ(std::set<std::string>({instance_id1, instance_id2}),
m_listener.add.ids);
MOCK_METHOD1(init, void(Context *));
MOCK_METHOD1(shut_down, void(Context *));
MOCK_METHOD1(acked, void(const std::vector<std::string> &));
+ MOCK_METHOD0(unblock_listener, void());
};
Instances<librbd::MockTestImageCtx> *Instances<librbd::MockTestImageCtx>::s_instance = nullptr;
expect_is_leader(mock_managed_lock, false, false);
}
+ void expect_unblock_listener(MockInstances& mock_instances) {
+ EXPECT_CALL(mock_instances, unblock_listener());
+ }
+
MockThreads *m_mock_threads;
};
expect_try_acquire_lock(mock_managed_lock, 0);
expect_init(mock_instances, 0);
expect_acquire_notify(mock_managed_lock, listener, 0);
+ expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
ASSERT_EQ(0, leader_watcher.init());
expect_try_acquire_lock(mock_managed_lock, 0);
expect_init(mock_instances, 0);
expect_acquire_notify(mock_managed_lock, listener, 0);
+ expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
ASSERT_EQ(0, leader_watcher.init());
expect_try_acquire_lock(mock_managed_lock, 0);
expect_init(mock_instances, 0);
expect_acquire_notify(mock_managed_lock, listener, 0);
+ expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
ASSERT_EQ(0, leader_watcher.init());
expect_try_acquire_lock(mock_managed_lock, 0);
expect_init(mock_instances, 0);
expect_acquire_notify(mock_managed_lock, listener, 0);
+ expect_unblock_listener(mock_instances);
expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
ASSERT_EQ(0, leader_watcher.init());
m_threads->work_queue->queue(ctx, 0);
}
+template <typename I>
+void Instances<I>::unblock_listener() {
+ dout(5) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_listener_blocked);
+ m_listener_blocked = false;
+
+ InstanceIds added_instance_ids;
+ for (auto& pair : m_instances) {
+ if (pair.second.state == INSTANCE_STATE_ADDING) {
+ added_instance_ids.push_back(pair.first);
+ }
+ }
+
+ if (!added_instance_ids.empty()) {
+ m_threads->work_queue->queue(
+ new C_NotifyInstancesAdded(this, added_instance_ids), 0);
+ }
+}
+
template <typename I>
void Instances<I>::acked(const InstanceIds& instance_ids) {
dout(20) << "instance_ids=" << instance_ids << dendl;
}
schedule_remove_task(time);
- if (!added_instance_ids.empty()) {
+ if (!m_listener_blocked && !added_instance_ids.empty()) {
m_threads->work_queue->queue(
new C_NotifyInstancesAdded(this, added_instance_ids), 0);
}
void init(Context *on_finish);
void shut_down(Context *on_finish);
+ void unblock_listener();
+
void acked(const InstanceIds& instance_ids);
void list(std::vector<std::string> *instance_ids);
Context *m_timer_task = nullptr;
+ bool m_listener_blocked = true;
+
void handle_acked(const InstanceIds& instance_ids);
void notify_instances_added(const InstanceIds& instance_ids);
void notify_instances_removed(const InstanceIds& instance_ids);