throttler.finish_op("id2");
}
+TEST_F(TestMockImageSyncThrottler, Duplicate) {
+ MockImageSyncThrottler throttler(g_ceph_context);
+ throttler.set_max_concurrent_syncs(1);
+
+ C_SaferCond on_start1;
+ throttler.start_op("id1", &on_start1);
+ ASSERT_EQ(0, on_start1.wait());
+
+ C_SaferCond on_start2;
+ throttler.start_op("id1", &on_start2);
+ ASSERT_EQ(0, on_start2.wait());
+
+ C_SaferCond on_start3;
+ throttler.start_op("id2", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("id2", &on_start4);
+ ASSERT_EQ(-ENOENT, on_start3.wait());
+
+ throttler.finish_op("id1");
+ ASSERT_EQ(0, on_start4.wait());
+ throttler.finish_op("id2");
+}
+
+TEST_F(TestMockImageSyncThrottler, Duplicate2) {
+ MockImageSyncThrottler throttler(g_ceph_context);
+ throttler.set_max_concurrent_syncs(2);
+
+ C_SaferCond on_start1;
+ throttler.start_op("id1", &on_start1);
+ ASSERT_EQ(0, on_start1.wait());
+ C_SaferCond on_start2;
+ throttler.start_op("id2", &on_start2);
+ ASSERT_EQ(0, on_start2.wait());
+
+ C_SaferCond on_start3;
+ throttler.start_op("id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("id3", &on_start4); // dup
+ ASSERT_EQ(-ENOENT, on_start3.wait());
+
+ C_SaferCond on_start5;
+ throttler.start_op("id4", &on_start5);
+
+ throttler.finish_op("id1");
+ ASSERT_EQ(0, on_start4.wait());
+
+ throttler.finish_op("id2");
+ ASSERT_EQ(0, on_start5.wait());
+
+ C_SaferCond on_start6;
+ throttler.start_op("id5", &on_start6);
+
+ throttler.finish_op("id3");
+ ASSERT_EQ(0, on_start6.wait());
+
+ throttler.finish_op("id4");
+ throttler.finish_op("id5");
+}
+
TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
MockImageSyncThrottler throttler(g_ceph_context);
throttler.set_max_concurrent_syncs(2);
void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
dout(20) << "id=" << id << dendl;
+ int r = 0;
{
Mutex::Locker locker(m_lock);
if (m_inflight_ops.count(id) > 0) {
dout(20) << "duplicate for already started op " << id << dendl;
+ } else if (m_queued_ops.count(id) > 0) {
+ dout(20) << "duplicate for already queued op " << id << dendl;
+ std::swap(m_queued_ops[id], on_start);
+ r = -ENOENT;
} else if (m_max_concurrent_syncs == 0 ||
m_inflight_ops.size() < m_max_concurrent_syncs) {
ceph_assert(m_queue.empty());
<< m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
<< dendl;
} else {
- m_queue.push_back(std::make_pair(id, on_start));
- on_start = nullptr;
+ m_queue.push_back(id);
+ std::swap(m_queued_ops[id], on_start);
dout(20) << "image sync for " << id << " has been queued" << dendl;
}
}
if (on_start != nullptr) {
- on_start->complete(0);
+ on_start->complete(r);
}
}
Context *on_start = nullptr;
{
Mutex::Locker locker(m_lock);
- for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
- if (it->first == id) {
- on_start = it->second;
- dout(20) << "canceled queued sync for " << id << dendl;
- m_queue.erase(it);
- break;
- }
+ auto it = m_queued_ops.find(id);
+ if (it != m_queued_ops.end()) {
+ dout(20) << "canceled queued sync for " << id << dendl;
+ m_queue.remove(id);
+ on_start = it->second;
+ m_queued_ops.erase(it);
}
}
m_inflight_ops.erase(id);
if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
- auto pair = m_queue.front();
- m_inflight_ops.insert(pair.first);
- dout(20) << "ready to start sync for " << pair.first << " ["
+ auto id = m_queue.front();
+ auto it = m_queued_ops.find(id);
+ ceph_assert(it != m_queued_ops.end());
+ m_inflight_ops.insert(id);
+ dout(20) << "ready to start sync for " << id << " ["
<< m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
<< dendl;
- on_start= pair.second;
+ on_start = it->second;
+ m_queued_ops.erase(it);
m_queue.pop_front();
}
}
void ImageSyncThrottler<I>::drain(int r) {
dout(20) << dendl;
- std::list<std::pair<std::string, Context *>> queue;
+ std::map<std::string, Context *> queued_ops;
{
Mutex::Locker locker(m_lock);
- std::swap(m_queue, queue);
+ std::swap(m_queued_ops, queued_ops);
+ m_queue.clear();
m_inflight_ops.clear();
}
- for (auto &pair : queue) {
- pair.second->complete(r);
+ for (auto &it : queued_ops) {
+ it.second->complete(r);
}
}
while ((m_max_concurrent_syncs == 0 ||
m_inflight_ops.size() < m_max_concurrent_syncs) &&
!m_queue.empty()) {
- auto pair = m_queue.front();
- m_inflight_ops.insert(pair.first);
- dout(20) << "ready to start sync for " << pair.first << " ["
+ auto id = m_queue.front();
+ m_inflight_ops.insert(id);
+ dout(20) << "ready to start sync for " << id << " ["
<< m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
<< dendl;
- ops.push_back(pair.second);
+ auto it = m_queued_ops.find(id);
+ ceph_assert(it != m_queued_ops.end());
+ ops.push_back(it->second);
+ m_queued_ops.erase(it);
m_queue.pop_front();
}
}