From: Michal Jarzabek Date: Wed, 22 Jun 2016 21:59:59 +0000 (+0100) Subject: msg/AsyncMessenger: mv WorkerPool class to cc file X-Git-Tag: v11.0.0~63^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=360765efe903c16628648c9e6055c1050075c7e0;p=ceph-ci.git msg/AsyncMessenger: mv WorkerPool class to cc file Signed-off-by: Michal Jarzabek --- diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 3feae6cb2c7..c6dfba4ac55 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -280,6 +280,47 @@ void Worker::stop() center.wakeup(); } +class WorkerPool { + WorkerPool(const WorkerPool &); + WorkerPool& operator=(const WorkerPool &); + CephContext *cct; + vector workers; + vector coreids; + // Used to indicate whether thread started + bool started; + Mutex barrier_lock; + Cond barrier_cond; + atomic_t barrier_count; + simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER; + + class C_barrier : public EventCallback { + WorkerPool *pool; + public: + explicit C_barrier(WorkerPool *p): pool(p) {} + void do_request(int id) { + Mutex::Locker l(pool->barrier_lock); + pool->barrier_count.dec(); + pool->barrier_cond.Signal(); + delete this; + } + }; + friend class C_barrier; + public: + explicit WorkerPool(CephContext *c); + virtual ~WorkerPool(); + void start(); + Worker *get_worker(); + void release_worker(EventCenter* c); + int get_cpuid(int id) { + if (coreids.empty()) + return -1; + return coreids[id % coreids.size()]; + } + void barrier(); + // uniq name for CephContext to distinguish differnt object + static const string name; +}; + void *Worker::entry() { ldout(cct, 10) << __func__ << " starting" << dendl; @@ -773,6 +814,15 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) lock.Unlock(); } +Connection *AsyncMessenger::create_anon_connection() { + Mutex::Locker l(lock); + Worker *w = pool->get_worker(); + return new AsyncConnection(cct, + this, + &dispatch_queue, + &w->center, w->get_perf_counter()); +} + int AsyncMessenger::get_proto_version(int peer_type, bool connect) { int my_type = my_inst.name.type(); @@ -844,3 +894,7 @@ int AsyncMessenger::reap_dead() return num; } + +void AsyncMessenger::release_worker(EventCenter* c) { + pool->release_worker(c); +} diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 545b5317103..2b8570cb6d1 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -132,47 +132,6 @@ class Processor { void accept(); }; -class WorkerPool { - WorkerPool(const WorkerPool &); - WorkerPool& operator=(const WorkerPool &); - CephContext *cct; - vector workers; - vector coreids; - // Used to indicate whether thread started - bool started; - Mutex barrier_lock; - Cond barrier_cond; - atomic_t barrier_count; - simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER; - - class C_barrier : public EventCallback { - WorkerPool *pool; - public: - explicit C_barrier(WorkerPool *p): pool(p) {} - void do_request(int id) { - Mutex::Locker l(pool->barrier_lock); - pool->barrier_count.dec(); - pool->barrier_cond.Signal(); - delete this; - } - }; - friend class C_barrier; - public: - explicit WorkerPool(CephContext *c); - virtual ~WorkerPool(); - void start(); - Worker *get_worker(); - void release_worker(EventCenter* c); - int get_cpuid(int id) { - if (coreids.empty()) - return -1; - return coreids[id % coreids.size()]; - } - void barrier(); - // uniq name for CephContext to distinguish differnt object - static const string name; -}; - /* * AsyncMessenger is represented for maintaining a set of asynchronous connections, * it may own a bind address and the accepted connections will be managed by @@ -266,11 +225,7 @@ public: * @{ */ - Connection *create_anon_connection() { - Mutex::Locker l(lock); - Worker *w = pool->get_worker(); - return new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter()); - } + Connection *create_anon_connection(); /** * @} // Inner classes @@ -545,9 +500,7 @@ public: */ int reap_dead(); - void release_worker(EventCenter* c) { - pool->release_worker(c); - } + void release_worker(EventCenter* c); /** * @} // AsyncMessenger Internals