void Worker::add_pending(PendingIO::ptr io) {
boost::mutex::scoped_lock lock(m_pending_ios_mutex);
- m_pending_ios.push_back(io);
+ assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id());
+ m_pending_ios[io->id()] = io;
}
void Worker::run() {
while (!m_pending_ios.empty()) {
if (!first_time) {
dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl;
- BOOST_FOREACH(PendingIO::ptr p, m_pending_ios) {
- dout(THREAD_LEVEL) << "> " << p->id() << dendl;
+ pair<action_id_t, PendingIO::ptr> p;
+ BOOST_FOREACH(p, m_pending_ios) {
+ dout(THREAD_LEVEL) << "> " << p.first << dendl;
}
}
m_pending_ios_empty.timed_wait(lock, boost::posix_time::seconds(1));
void Worker::remove_pending(PendingIO::ptr io) {
m_replayer.set_action_complete(io->id());
boost::mutex::scoped_lock lock(m_pending_ios_mutex);
- for (vector<PendingIO::ptr>::iterator itr = m_pending_ios.begin(); itr != m_pending_ios.end(); itr++) {
- if (*itr == io) {
- m_pending_ios.erase(itr);
- break;
- }
- }
+ size_t num_erased = m_pending_ios.erase(io->id());
+ assertf(num_erased == 1, "id = %d", io->id());
if (m_pending_ios.empty()) {
m_pending_ios_empty.notify_all();
}
Replayer &m_replayer;
BoundedBuffer<Action::ptr> m_buffer;
boost::shared_ptr<boost::thread> m_thread;
- std::vector<PendingIO::ptr> m_pending_ios;
+ std::map<action_id_t, PendingIO::ptr> m_pending_ios;
boost::mutex m_pending_ios_mutex;
boost::condition m_pending_ios_empty;
bool m_done;