From: Haomai Wang Date: Wed, 29 Jun 2016 09:14:16 +0000 (+0800) Subject: msg/async: make sure worker started before let msgr ready X-Git-Tag: ses5-milestone5~429^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cbe5ef6385ff49ae1ef26357e11492b723c30d71;p=ceph.git msg/async: make sure worker started before let msgr ready When we create event thread, it need a little time to enter event loop(like calling set_owner), if caller is going to call create_file_event before event thread enter event loop, it will trigger assert. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index b81a0781a313..029942d6677b 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -331,6 +331,7 @@ class WorkerPool { }; friend class C_barrier; public: + std::atomic_uint pending; explicit WorkerPool(CephContext *c); WorkerPool(const WorkerPool &) = delete; WorkerPool& operator=(const WorkerPool &) = delete; @@ -359,6 +360,7 @@ void *Worker::entry() } center.set_owner(); + pool->pending--; while (!done) { ldout(cct, 20) << __func__ << " calling event process" << dendl; @@ -380,7 +382,7 @@ const string WorkerPool::name = "AsyncMessenger::WorkerPool"; WorkerPool::WorkerPool(CephContext *c): cct(c), started(false), barrier_lock("WorkerPool::WorkerPool::barrier_lock"), - barrier_count(0) + barrier_count(0), pending(0) { assert(cct->_conf->ms_async_op_threads > 0); // make sure user won't try to force some crazy number of worker threads @@ -419,10 +421,13 @@ void WorkerPool::start() { if (!started) { for (uint64_t i = 0; i < workers.size(); ++i) { + pending++; workers[i]->create("ms_async_worker"); } started = true; } + while (pending) + usleep(50); } Worker* WorkerPool::get_worker() @@ -461,6 +466,7 @@ Worker* WorkerPool::get_worker() ldout(cct, 20) << __func__ << " creating worker" << dendl; current_best = new Worker(cct, this, workers.size()); workers.push_back(current_best); + pending++; current_best->create("ms_async_worker"); } else { ldout(cct, 20) << __func__ << " picked " << current_best @@ -470,6 +476,8 @@ Worker* WorkerPool::get_worker() ++current_best->references; simple_spin_unlock(&pool_spin); + while (pending) + usleep(50); assert(current_best); return current_best; } @@ -529,6 +537,7 @@ void AsyncMessenger::ready() ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; Mutex::Locker l(lock); + pool->start(); Worker *w = pool->get_worker(); processor.start(w); dispatch_queue.start(); @@ -603,7 +612,6 @@ int AsyncMessenger::start() my_inst.addr.nonce = nonce; _init_local_connection(); } - pool->start(); lock.Unlock(); return 0;