]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/Stack: disable smart thread spawn now
authorHaomai Wang <haomai@xsky.com>
Tue, 12 Jul 2016 02:16:33 +0000 (10:16 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 16 Aug 2016 15:19:23 +0000 (23:19 +0800)
New async msgr runtime need to spawn threads when binding, but ceph-osd will
call daemon() after binding port. So we need to respawn threads if forked.

Then thread spawn delay will increase complexity for this change and it's
really a simple strategy which help less, we disable auto spawn now.

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/PosixStack.h
src/msg/async/Stack.cc
src/msg/async/Stack.h

index 95ec0ebd1409f30ce42958f94e8f551d3d576087..0964dad30334b08e751b7459df01f09dfa313ba1 100644 (file)
@@ -300,6 +300,7 @@ void AsyncMessenger::ready()
 {
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
 
+  stack->start();
   Mutex::Locker l(lock);
   for (auto &&p : processors)
     p->start();
index eeb7f318c8dd0bea2938c650ff99574005e76715..149db320d8df28c2707f45cc9ea8b52114b22196 100644 (file)
@@ -48,14 +48,13 @@ class PosixNetworkStack : public NetworkStack {
       return -1;
     return coreids[id % coreids.size()];
   }
-  virtual void spawn_workers(std::vector<std::function<void ()>> &funcs) override {
-    for (unsigned i = threads.size(); i < funcs.size(); ++i)
-      threads.emplace_back(std::thread(std::move(funcs[i])));
+  virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
+    threads.resize(i+1);
+    threads[i] = std::move(std::thread(func));
   }
-  virtual void join_workers() override {
-    for (auto &&t : threads)
-      t.join();
-    threads.clear();
+  virtual void join_worker(unsigned i) override {
+    assert(threads.size() > i && threads[i].joinable());
+    threads[i].join();
   }
 };
 
index 6755d1e8af5b567141fd6697634e749d2935267c..52f17df0de603aad2e641732f0a9ddcb0dd4d27a 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "stack "
 
-void NetworkStack::add_thread(unsigned i)
+void NetworkStack::add_thread(unsigned i, std::function<void ()> &thread)
 {
-  assert(threads.size() <= i);
   Worker *w = workers[i];
-  threads.emplace_back(
+  thread = std::move(
     [this, w]() {
       const uint64_t InitEventNumber = 5000;
       const uint64_t EventMaxWaitUs = 30000000;
@@ -69,11 +68,11 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned
 
 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
 {
-  for (unsigned i = 0; i < cct->_conf->ms_async_max_op_threads; ++i) {
+  num_workers = cct->_conf->ms_async_op_threads;
+  for (unsigned i = 0; i < num_workers; ++i) {
     Worker *w = create_worker(cct, type, i);
     workers.push_back(w);
   }
-  num_workers = cct->_conf->ms_async_op_threads;
 }
 
 void NetworkStack::start()
@@ -83,9 +82,17 @@ void NetworkStack::start()
     pool_spin.unlock();
     return ;
   }
-  for (unsigned i = 0; i < num_workers; ++i)
-    add_thread(i);
-  spawn_workers(threads);
+
+  if (started) {
+    return ;
+  }
+  for (unsigned i = 0; i < num_workers; ++i) {
+    if (workers[i]->is_init())
+      continue;
+    std::function<void ()> thread;
+    add_thread(i, thread);
+    spawn_worker(i, std::move(thread));
+  }
   started = true;
   pool_spin.unlock();
 
@@ -125,9 +132,8 @@ void NetworkStack::stop()
   for (unsigned i = 0; i < num_workers; ++i) {
     workers[i]->done = true;
     workers[i]->center.wakeup();
+    join_worker(i);
   }
-  join_workers();
-  threads.clear();
   started = false;
 }
 
index 9996ed2841085e9e78894a8345f488d198134e02..5b75aa699dc28730d2d6a589fbb1921ff95af277 100644 (file)
@@ -254,6 +254,10 @@ class Worker {
     init_cond.notify_all();
     init_lock.unlock();
   }
+  bool is_init() {
+    std::lock_guard<std::mutex> l(init_lock);
+    return init;
+  }
   void wait_for_init() {
     std::unique_lock<std::mutex> l(init_lock);
     while (!init)
@@ -270,16 +274,15 @@ class Worker {
 
 class NetworkStack {
   std::string type;
-  std::atomic_bool started;
   unsigned num_workers = 0;
   Spinlock pool_spin;
+  bool started = false;
 
-  void add_thread(unsigned i);
+  void add_thread(unsigned i, std::function<void ()> &ts);
 
  protected:
   CephContext *cct;
   vector<Worker*> workers;
-  std::vector<std::function<void ()>> threads;
   // Used to indicate whether thread started
 
   explicit NetworkStack(CephContext *c, const string &t);
@@ -316,8 +319,8 @@ class NetworkStack {
   }
 
   // direct is used in tests only
-  virtual void spawn_workers(std::vector<std::function<void ()>> &) = 0;
-  virtual void join_workers() = 0;
+  virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
+  virtual void join_worker(unsigned i) = 0;
 
  private:
   NetworkStack(const NetworkStack &);