]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Make WorkerPool use CephContext's singleton 2987/head
authorHaomai Wang <haomaiwang@gmail.com>
Mon, 1 Dec 2014 17:23:21 +0000 (01:23 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Mon, 1 Dec 2014 17:23:21 +0000 (01:23 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 6f8e5f99d1ee40c97c4a80cbf122c28d6beacc4a..03afc0adeb269769ab98afa7492721c12fd680df 100644 (file)
@@ -310,61 +310,36 @@ void *Worker::entry()
 
 /*******************
  * WorkerPool
- *
- * Because CephContext is used by Worker and WorkPool should rely to the
- * lifecycle of CephContext. One client may use multi CephContext, if one
- * destroyed, we can't let worker associated to this CephContext alive.
- * So each WorkerPool is associated to one CephContext.
- *
- * Note: it may better that CephCOntext implement observer mode?
- */
-map<CephContext*, WorkerPool*> WorkerPool::pools;
-map<CephContext*, vector<Worker*> > WorkerPool::workers;
-Mutex WorkerPool::lock("WorkerPool::lock");
+ *******************/
+const string WorkerPool::name = "AsyncMessenger::WorkerPool";
 
-WorkerPool *WorkerPool::init(CephContext *cct) {
-  Mutex::Locker l(lock);
-  if (!pools.count(cct)) {
-    assert(!workers.count(cct));
-    for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
-      Worker *w = new Worker(cct);
-      workers[cct].push_back(w);
-    }
-    pools[cct] = new WorkerPool(cct);
+WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false)
+{
+  for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
+    Worker *w = new Worker(cct);
+    workers.push_back(w);
   }
+}
 
-  WorkerPool *p = pools[cct];
-  p->ref++;
-  return p;
+WorkerPool::~WorkerPool()
+{
+  for (uint64_t i = 0; i < workers.size(); ++i) {
+    workers[i]->stop();
+    workers[i]->join();
+    delete workers[i];
+  }
 }
 
 void WorkerPool::start()
 {
-  Mutex::Locker l(lock);
   if (!started) {
     for (uint64_t i = 0; i < workers.size(); ++i) {
-      workers[cct][i]->create();
+      workers[i]->create();
     }
     started = true;
   }
 }
 
-void WorkerPool::deinit()
-{
-  Mutex::Locker l(lock);
-  ref--;
-  if (!ref && started) {
-    for (uint64_t i = 0; i < workers.size(); ++i) {
-      workers[cct][i]->stop();
-      workers[cct][i]->join();
-      delete workers[cct][i];
-    }
-    workers.erase(cct);
-    pools.erase(cct);
-    started = false;
-    delete this;
-  }
-}
 
 /*******************
  * AsyncMessenger
@@ -380,7 +355,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     cluster_protocol(0), stopped(true)
 {
   ceph_spin_init(&global_seq_lock);
-  pool = WorkerPool::init(cct);
+  cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
   local_connection = new AsyncConnection(cct, this, &pool->get_worker()->center);
   init_local_connection();
 }
@@ -392,7 +367,6 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
 AsyncMessenger::~AsyncMessenger()
 {
   assert(!did_bind); // either we didn't bind or we shut down the Processor
-  pool->deinit();
 }
 
 void AsyncMessenger::ready()
index 50eaf74bfb1f1b2a6a2d4893d6fd3b9bc1c53d33..aff5ed3d2b8d0a02a8b37fd99fb69be50c55d368 100644 (file)
@@ -75,28 +75,24 @@ class Worker : public Thread {
 };
 
 
-class WorkerPool {
-  WorkerPool(CephContext *c):
-    cct(c), seq(0), started(false), ref(0) {}
+class WorkerPool: CephContext::AssociatedSingletonObject {
   WorkerPool(const WorkerPool &);
   WorkerPool& operator=(const WorkerPool &);
   CephContext *cct;
   uint64_t seq;
+  vector<Worker*> workers;
   // Used to indicate whether thread started
   bool started;
-  // Used to control whether need to destroy thread
-  static map<CephContext*, vector<Worker*> > workers;
-  static map<CephContext*, WorkerPool*> pools;
-  static Mutex lock;
 
  public:
-  int ref;
-  static WorkerPool *init(CephContext *cct);
-  void deinit();
+  WorkerPool(CephContext *c);
+  virtual ~WorkerPool();
   void start();
   Worker *get_worker() {
-    return workers[cct][(seq++)%workers.size()];
+    return workers[(seq++)%workers.size()];
   }
+  // uniq name for CephContext to distinguish differnt object
+  static const string name;
 };
 
 /*