center.wakeup();
}
+class WorkerPool {
+ WorkerPool(const WorkerPool &);
+ WorkerPool& operator=(const WorkerPool &);
+ CephContext *cct;
+ vector<Worker*> workers;
+ vector<int> coreids;
+ // Used to indicate whether thread started
+ bool started;
+ Mutex barrier_lock;
+ Cond barrier_cond;
+ atomic_t barrier_count;
+ simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
+
+ class C_barrier : public EventCallback {
+ WorkerPool *pool;
+ public:
+ explicit C_barrier(WorkerPool *p): pool(p) {}
+ void do_request(int id) {
+ Mutex::Locker l(pool->barrier_lock);
+ pool->barrier_count.dec();
+ pool->barrier_cond.Signal();
+ delete this;
+ }
+ };
+ friend class C_barrier;
+ public:
+ explicit WorkerPool(CephContext *c);
+ virtual ~WorkerPool();
+ void start();
+ Worker *get_worker();
+ void release_worker(EventCenter* c);
+ int get_cpuid(int id) {
+ if (coreids.empty())
+ return -1;
+ return coreids[id % coreids.size()];
+ }
+ void barrier();
+ // uniq name for CephContext to distinguish differnt object
+ static const string name;
+};
+
void *Worker::entry()
{
ldout(cct, 10) << __func__ << " starting" << dendl;
lock.Unlock();
}
+Connection *AsyncMessenger::create_anon_connection() {
+ Mutex::Locker l(lock);
+ Worker *w = pool->get_worker();
+ return new AsyncConnection(cct,
+ this,
+ &dispatch_queue,
+ &w->center, w->get_perf_counter());
+}
+
int AsyncMessenger::get_proto_version(int peer_type, bool connect)
{
int my_type = my_inst.name.type();
return num;
}
+
+void AsyncMessenger::release_worker(EventCenter* c) {
+ pool->release_worker(c);
+}
void accept();
};
-class WorkerPool {
- WorkerPool(const WorkerPool &);
- WorkerPool& operator=(const WorkerPool &);
- CephContext *cct;
- vector<Worker*> workers;
- vector<int> coreids;
- // Used to indicate whether thread started
- bool started;
- Mutex barrier_lock;
- Cond barrier_cond;
- atomic_t barrier_count;
- simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
-
- class C_barrier : public EventCallback {
- WorkerPool *pool;
- public:
- explicit C_barrier(WorkerPool *p): pool(p) {}
- void do_request(int id) {
- Mutex::Locker l(pool->barrier_lock);
- pool->barrier_count.dec();
- pool->barrier_cond.Signal();
- delete this;
- }
- };
- friend class C_barrier;
- public:
- explicit WorkerPool(CephContext *c);
- virtual ~WorkerPool();
- void start();
- Worker *get_worker();
- void release_worker(EventCenter* c);
- int get_cpuid(int id) {
- if (coreids.empty())
- return -1;
- return coreids[id % coreids.size()];
- }
- void barrier();
- // uniq name for CephContext to distinguish differnt object
- static const string name;
-};
-
/*
* AsyncMessenger is represented for maintaining a set of asynchronous connections,
* it may own a bind address and the accepted connections will be managed by
* @{
*/
- Connection *create_anon_connection() {
- Mutex::Locker l(lock);
- Worker *w = pool->get_worker();
- return new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter());
- }
+ Connection *create_anon_connection();
/**
* @} // Inner classes
*/
int reap_dead();
- void release_worker(EventCenter* c) {
- pool->release_worker(c);
- }
+ void release_worker(EventCenter* c);
/**
* @} // AsyncMessenger Internals