};
friend class C_barrier;
public:
+ std::atomic_uint pending;
explicit WorkerPool(CephContext *c);
WorkerPool(const WorkerPool &) = delete;
WorkerPool& operator=(const WorkerPool &) = delete;
}
center.set_owner();
+ pool->pending--;
while (!done) {
ldout(cct, 20) << __func__ << " calling event process" << dendl;
WorkerPool::WorkerPool(CephContext *c): cct(c), started(false),
barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
- barrier_count(0)
+ barrier_count(0), pending(0)
{
assert(cct->_conf->ms_async_op_threads > 0);
// make sure user won't try to force some crazy number of worker threads
{
if (!started) {
for (uint64_t i = 0; i < workers.size(); ++i) {
+ pending++;
workers[i]->create("ms_async_worker");
}
started = true;
}
+ while (pending)
+ usleep(50);
}
Worker* WorkerPool::get_worker()
ldout(cct, 20) << __func__ << " creating worker" << dendl;
current_best = new Worker(cct, this, workers.size());
workers.push_back(current_best);
+ pending++;
current_best->create("ms_async_worker");
} else {
ldout(cct, 20) << __func__ << " picked " << current_best
++current_best->references;
simple_spin_unlock(&pool_spin);
+ while (pending)
+ usleep(50);
assert(current_best);
return current_best;
}
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
Mutex::Locker l(lock);
+ pool->start();
Worker *w = pool->get_worker();
processor.start(w);
dispatch_queue.start();
my_inst.addr.nonce = nonce;
_init_local_connection();
}
- pool->start();
lock.Unlock();
return 0;