{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+ stack->start();
Mutex::Locker l(lock);
for (auto &&p : processors)
p->start();
return -1;
return coreids[id % coreids.size()];
}
- virtual void spawn_workers(std::vector<std::function<void ()>> &funcs) override {
- for (unsigned i = threads.size(); i < funcs.size(); ++i)
- threads.emplace_back(std::thread(std::move(funcs[i])));
+ virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
+ threads.resize(i+1);
+ threads[i] = std::move(std::thread(func));
}
- virtual void join_workers() override {
- for (auto &&t : threads)
- t.join();
- threads.clear();
+ virtual void join_worker(unsigned i) override {
+ assert(threads.size() > i && threads[i].joinable());
+ threads[i].join();
}
};
#undef dout_prefix
#define dout_prefix *_dout << "stack "
-void NetworkStack::add_thread(unsigned i)
+void NetworkStack::add_thread(unsigned i, std::function<void ()> &thread)
{
- assert(threads.size() <= i);
Worker *w = workers[i];
- threads.emplace_back(
+ thread = std::move(
[this, w]() {
const uint64_t InitEventNumber = 5000;
const uint64_t EventMaxWaitUs = 30000000;
NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
{
- for (unsigned i = 0; i < cct->_conf->ms_async_max_op_threads; ++i) {
+ num_workers = cct->_conf->ms_async_op_threads;
+ for (unsigned i = 0; i < num_workers; ++i) {
Worker *w = create_worker(cct, type, i);
workers.push_back(w);
}
- num_workers = cct->_conf->ms_async_op_threads;
}
void NetworkStack::start()
pool_spin.unlock();
return ;
}
- for (unsigned i = 0; i < num_workers; ++i)
- add_thread(i);
- spawn_workers(threads);
+
+ if (started) {
+ return ;
+ }
+ for (unsigned i = 0; i < num_workers; ++i) {
+ if (workers[i]->is_init())
+ continue;
+ std::function<void ()> thread;
+ add_thread(i, thread);
+ spawn_worker(i, std::move(thread));
+ }
started = true;
pool_spin.unlock();
for (unsigned i = 0; i < num_workers; ++i) {
workers[i]->done = true;
workers[i]->center.wakeup();
+ join_worker(i);
}
- join_workers();
- threads.clear();
started = false;
}
init_cond.notify_all();
init_lock.unlock();
}
+ bool is_init() {
+ std::lock_guard<std::mutex> l(init_lock);
+ return init;
+ }
void wait_for_init() {
std::unique_lock<std::mutex> l(init_lock);
while (!init)
class NetworkStack {
std::string type;
- std::atomic_bool started;
unsigned num_workers = 0;
Spinlock pool_spin;
+ bool started = false;
- void add_thread(unsigned i);
+ void add_thread(unsigned i, std::function<void ()> &ts);
protected:
CephContext *cct;
vector<Worker*> workers;
- std::vector<std::function<void ()>> threads;
// Used to indicate whether thread started
explicit NetworkStack(CephContext *c, const string &t);
}
// direct is used in tests only
- virtual void spawn_workers(std::vector<std::function<void ()>> &) = 0;
- virtual void join_workers() = 0;
+ virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
+ virtual void join_worker(unsigned i) = 0;
private:
NetworkStack(const NetworkStack &);