]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: make sure worker started before let msgr ready
authorHaomai Wang <haomai@xsky.com>
Wed, 29 Jun 2016 09:14:16 +0000 (17:14 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 12 Jul 2016 15:48:33 +0000 (23:48 +0800)
When we create event thread, it need a little time to enter event loop(like
calling set_owner), if caller is going to call create_file_event before event
thread enter event loop, it will trigger assert.

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncMessenger.cc

index b81a0781a313a4b0763618b99c2f0c8b281c82e1..029942d6677bb9ce73bf0b037fca217ca3721d26 100644 (file)
@@ -331,6 +331,7 @@ class WorkerPool {
   };
   friend class C_barrier;
   public:
+  std::atomic_uint pending;
   explicit WorkerPool(CephContext *c);
   WorkerPool(const WorkerPool &) = delete;
   WorkerPool& operator=(const WorkerPool &) = delete;
@@ -359,6 +360,7 @@ void *Worker::entry()
   }
 
   center.set_owner();
+  pool->pending--;
   while (!done) {
     ldout(cct, 20) << __func__ << " calling event process" << dendl;
 
@@ -380,7 +382,7 @@ const string WorkerPool::name = "AsyncMessenger::WorkerPool";
 
 WorkerPool::WorkerPool(CephContext *c): cct(c), started(false),
                                         barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
-                                        barrier_count(0)
+                                        barrier_count(0), pending(0)
 {
   assert(cct->_conf->ms_async_op_threads > 0);
   // make sure user won't try to force some crazy number of worker threads
@@ -419,10 +421,13 @@ void WorkerPool::start()
 {
   if (!started) {
     for (uint64_t i = 0; i < workers.size(); ++i) {
+      pending++;
       workers[i]->create("ms_async_worker");
     }
     started = true;
   }
+  while (pending)
+    usleep(50);
 }
 
 Worker* WorkerPool::get_worker()
@@ -461,6 +466,7 @@ Worker* WorkerPool::get_worker()
      ldout(cct, 20) << __func__ << " creating worker" << dendl;
      current_best = new Worker(cct, this, workers.size());
      workers.push_back(current_best);
+     pending++;
      current_best->create("ms_async_worker");
   } else {
     ldout(cct, 20) << __func__ << " picked " << current_best 
@@ -470,6 +476,8 @@ Worker* WorkerPool::get_worker()
   ++current_best->references;
   simple_spin_unlock(&pool_spin);
 
+  while (pending)
+    usleep(50);
   assert(current_best);
   return current_best;
 }
@@ -529,6 +537,7 @@ void AsyncMessenger::ready()
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
 
   Mutex::Locker l(lock);
+  pool->start();
   Worker *w = pool->get_worker();
   processor.start(w);
   dispatch_queue.start();
@@ -603,7 +612,6 @@ int AsyncMessenger::start()
     my_inst.addr.nonce = nonce;
     _init_local_connection();
   }
-  pool->start();
 
   lock.Unlock();
   return 0;