]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/AsyncMessenger: mv WorkerPool class to cc file 9880/head
authorMichal Jarzabek <stiopa@gmail.com>
Wed, 22 Jun 2016 21:59:59 +0000 (22:59 +0100)
committerMichal Jarzabek <stiopa@gmail.com>
Wed, 22 Jun 2016 22:02:56 +0000 (23:02 +0100)
Signed-off-by: Michal Jarzabek <stiopa@gmail.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 3feae6cb2c775b2f8d413e9fc112954f1a2adc31..c6dfba4ac55802583e8c9f991781422fbd7acbf2 100644 (file)
@@ -280,6 +280,47 @@ void Worker::stop()
   center.wakeup();
 }
 
+class WorkerPool {
+  WorkerPool(const WorkerPool &);
+  WorkerPool& operator=(const WorkerPool &);
+  CephContext *cct;
+  vector<Worker*> workers;
+  vector<int> coreids;
+  // Used to indicate whether thread started
+  bool started;
+  Mutex barrier_lock;
+  Cond barrier_cond;
+  atomic_t barrier_count;
+  simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
+
+  class C_barrier : public EventCallback {
+    WorkerPool *pool;
+    public:
+    explicit C_barrier(WorkerPool *p): pool(p) {}
+    void do_request(int id) {
+      Mutex::Locker l(pool->barrier_lock);
+      pool->barrier_count.dec();
+      pool->barrier_cond.Signal();
+      delete this;
+    }
+  };
+  friend class C_barrier;
+  public:
+  explicit WorkerPool(CephContext *c);
+  virtual ~WorkerPool();
+  void start();
+  Worker *get_worker();
+  void release_worker(EventCenter* c);
+  int get_cpuid(int id) {
+    if (coreids.empty())
+      return -1;
+    return coreids[id % coreids.size()];
+  }
+  void barrier();
+  // uniq name for CephContext to distinguish differnt object
+  static const string name;
+};
+
 void *Worker::entry()
 {
   ldout(cct, 10) << __func__ << " starting" << dendl;
@@ -773,6 +814,15 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
   lock.Unlock();
 }
 
+Connection *AsyncMessenger::create_anon_connection() {
+  Mutex::Locker l(lock);
+  Worker *w = pool->get_worker();
+  return new AsyncConnection(cct,
+                            this,
+                            &dispatch_queue,
+                            &w->center, w->get_perf_counter());
+}
+
 int AsyncMessenger::get_proto_version(int peer_type, bool connect)
 {
   int my_type = my_inst.name.type();
@@ -844,3 +894,7 @@ int AsyncMessenger::reap_dead()
 
   return num;
 }
+
+void AsyncMessenger::release_worker(EventCenter* c) {
+  pool->release_worker(c);
+}
index 545b5317103925224f14bb0d9960800ee654ef04..2b8570cb6d1d72f124b23dfff90106ee4750af06 100644 (file)
@@ -132,47 +132,6 @@ class Processor {
   void accept();
 };
 
-class WorkerPool {
-  WorkerPool(const WorkerPool &);
-  WorkerPool& operator=(const WorkerPool &);
-  CephContext *cct;
-  vector<Worker*> workers;
-  vector<int> coreids;
-  // Used to indicate whether thread started
-  bool started;
-  Mutex barrier_lock;
-  Cond barrier_cond;
-  atomic_t barrier_count;
-  simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
-
-  class C_barrier : public EventCallback {
-    WorkerPool *pool;
-   public:
-    explicit C_barrier(WorkerPool *p): pool(p) {}
-    void do_request(int id) {
-      Mutex::Locker l(pool->barrier_lock);
-      pool->barrier_count.dec();
-      pool->barrier_cond.Signal();
-      delete this;
-    }
-  };
-  friend class C_barrier;
- public:
-  explicit WorkerPool(CephContext *c);
-  virtual ~WorkerPool();
-  void start();
-  Worker *get_worker();
-  void release_worker(EventCenter* c);
-  int get_cpuid(int id) {
-    if (coreids.empty())
-      return -1;
-    return coreids[id % coreids.size()];
-  }
-  void barrier();
-  // uniq name for CephContext to distinguish differnt object
-  static const string name;
-};
-
 /*
  * AsyncMessenger is represented for maintaining a set of asynchronous connections,
  * it may own a bind address and the accepted connections will be managed by
@@ -266,11 +225,7 @@ public:
    * @{
    */
 
-  Connection *create_anon_connection() {
-    Mutex::Locker l(lock);
-    Worker *w = pool->get_worker();
-    return new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter());
-  }
+  Connection *create_anon_connection();
 
   /**
    * @} // Inner classes
@@ -545,9 +500,7 @@ public:
    */
   int reap_dead();
   
-  void release_worker(EventCenter* c) {
-    pool->release_worker(c);
-  }
+  void release_worker(EventCenter* c);
 
   /**
    * @} // AsyncMessenger Internals