From 31833e8b488797ea956a7c877a22d74abb4df510 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 12 Jul 2016 10:16:33 +0800 Subject: [PATCH] msg/async/Stack: disable smart thread spawn now New async msgr runtime need to spawn threads when binding, but ceph-osd will call daemon() after binding port. So we need to respawn threads if forked. Then thread spawn delay will increase complexity for this change and it's really a simple strategy which help less, we disable auto spawn now. Signed-off-by: Haomai Wang --- src/msg/async/AsyncMessenger.cc | 1 + src/msg/async/PosixStack.h | 13 ++++++------- src/msg/async/Stack.cc | 26 ++++++++++++++++---------- src/msg/async/Stack.h | 13 ++++++++----- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 95ec0ebd1409f..0964dad30334b 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -300,6 +300,7 @@ void AsyncMessenger::ready() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + stack->start(); Mutex::Locker l(lock); for (auto &&p : processors) p->start(); diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h index eeb7f318c8dd0..149db320d8df2 100644 --- a/src/msg/async/PosixStack.h +++ b/src/msg/async/PosixStack.h @@ -48,14 +48,13 @@ class PosixNetworkStack : public NetworkStack { return -1; return coreids[id % coreids.size()]; } - virtual void spawn_workers(std::vector> &funcs) override { - for (unsigned i = threads.size(); i < funcs.size(); ++i) - threads.emplace_back(std::thread(std::move(funcs[i]))); + virtual void spawn_worker(unsigned i, std::function &&func) override { + threads.resize(i+1); + threads[i] = std::move(std::thread(func)); } - virtual void join_workers() override { - for (auto &&t : threads) - t.join(); - threads.clear(); + virtual void join_worker(unsigned i) override { + assert(threads.size() > i && threads[i].joinable()); + threads[i].join(); } }; diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 6755d1e8af5b5..52f17df0de603 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -25,11 +25,10 @@ #undef dout_prefix #define dout_prefix *_dout << "stack " -void NetworkStack::add_thread(unsigned i) +void NetworkStack::add_thread(unsigned i, std::function &thread) { - assert(threads.size() <= i); Worker *w = workers[i]; - threads.emplace_back( + thread = std::move( [this, w]() { const uint64_t InitEventNumber = 5000; const uint64_t EventMaxWaitUs = 30000000; @@ -69,11 +68,11 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) { - for (unsigned i = 0; i < cct->_conf->ms_async_max_op_threads; ++i) { + num_workers = cct->_conf->ms_async_op_threads; + for (unsigned i = 0; i < num_workers; ++i) { Worker *w = create_worker(cct, type, i); workers.push_back(w); } - num_workers = cct->_conf->ms_async_op_threads; } void NetworkStack::start() @@ -83,9 +82,17 @@ void NetworkStack::start() pool_spin.unlock(); return ; } - for (unsigned i = 0; i < num_workers; ++i) - add_thread(i); - spawn_workers(threads); + + if (started) { + return ; + } + for (unsigned i = 0; i < num_workers; ++i) { + if (workers[i]->is_init()) + continue; + std::function thread; + add_thread(i, thread); + spawn_worker(i, std::move(thread)); + } started = true; pool_spin.unlock(); @@ -125,9 +132,8 @@ void NetworkStack::stop() for (unsigned i = 0; i < num_workers; ++i) { workers[i]->done = true; workers[i]->center.wakeup(); + join_worker(i); } - join_workers(); - threads.clear(); started = false; } diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index 9996ed2841085..5b75aa699dc28 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -254,6 +254,10 @@ class Worker { init_cond.notify_all(); init_lock.unlock(); } + bool is_init() { + std::lock_guard l(init_lock); + return init; + } void wait_for_init() { std::unique_lock l(init_lock); while (!init) @@ -270,16 +274,15 @@ class Worker { class NetworkStack { std::string type; - std::atomic_bool started; unsigned num_workers = 0; Spinlock pool_spin; + bool started = false; - void add_thread(unsigned i); + void add_thread(unsigned i, std::function &ts); protected: CephContext *cct; vector workers; - std::vector> threads; // Used to indicate whether thread started explicit NetworkStack(CephContext *c, const string &t); @@ -316,8 +319,8 @@ class NetworkStack { } // direct is used in tests only - virtual void spawn_workers(std::vector> &) = 0; - virtual void join_workers() = 0; + virtual void spawn_worker(unsigned i, std::function &&) = 0; + virtual void join_worker(unsigned i) = 0; private: NetworkStack(const NetworkStack &); -- 2.39.5