/*******************
* WorkerPool
- *
- * Because CephContext is used by Worker and WorkPool should rely to the
- * lifecycle of CephContext. One client may use multi CephContext, if one
- * destroyed, we can't let worker associated to this CephContext alive.
- * So each WorkerPool is associated to one CephContext.
- *
- * Note: it may better that CephCOntext implement observer mode?
- */
-map<CephContext*, WorkerPool*> WorkerPool::pools;
-map<CephContext*, vector<Worker*> > WorkerPool::workers;
-Mutex WorkerPool::lock("WorkerPool::lock");
+ *******************/
+const string WorkerPool::name = "AsyncMessenger::WorkerPool";
-WorkerPool *WorkerPool::init(CephContext *cct) {
- Mutex::Locker l(lock);
- if (!pools.count(cct)) {
- assert(!workers.count(cct));
- for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
- Worker *w = new Worker(cct);
- workers[cct].push_back(w);
- }
- pools[cct] = new WorkerPool(cct);
+WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false)
+{
+ for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
+ Worker *w = new Worker(cct);
+ workers.push_back(w);
}
+}
- WorkerPool *p = pools[cct];
- p->ref++;
- return p;
+WorkerPool::~WorkerPool()
+{
+ for (uint64_t i = 0; i < workers.size(); ++i) {
+ workers[i]->stop();
+ workers[i]->join();
+ delete workers[i];
+ }
}
void WorkerPool::start()
{
- Mutex::Locker l(lock);
if (!started) {
for (uint64_t i = 0; i < workers.size(); ++i) {
- workers[cct][i]->create();
+ workers[i]->create();
}
started = true;
}
}
-void WorkerPool::deinit()
-{
- Mutex::Locker l(lock);
- ref--;
- if (!ref && started) {
- for (uint64_t i = 0; i < workers.size(); ++i) {
- workers[cct][i]->stop();
- workers[cct][i]->join();
- delete workers[cct][i];
- }
- workers.erase(cct);
- pools.erase(cct);
- started = false;
- delete this;
- }
-}
/*******************
* AsyncMessenger
cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
- pool = WorkerPool::init(cct);
+ cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
local_connection = new AsyncConnection(cct, this, &pool->get_worker()->center);
init_local_connection();
}
AsyncMessenger::~AsyncMessenger()
{
assert(!did_bind); // either we didn't bind or we shut down the Processor
- pool->deinit();
}
void AsyncMessenger::ready()
};
-class WorkerPool {
- WorkerPool(CephContext *c):
- cct(c), seq(0), started(false), ref(0) {}
+class WorkerPool: CephContext::AssociatedSingletonObject {
WorkerPool(const WorkerPool &);
WorkerPool& operator=(const WorkerPool &);
CephContext *cct;
uint64_t seq;
+ vector<Worker*> workers;
// Used to indicate whether thread started
bool started;
- // Used to control whether need to destroy thread
- static map<CephContext*, vector<Worker*> > workers;
- static map<CephContext*, WorkerPool*> pools;
- static Mutex lock;
public:
- int ref;
- static WorkerPool *init(CephContext *cct);
- void deinit();
+ WorkerPool(CephContext *c);
+ virtual ~WorkerPool();
void start();
Worker *get_worker() {
- return workers[cct][(seq++)%workers.size()];
+ return workers[(seq++)%workers.size()];
}
+ // uniq name for CephContext to distinguish differnt object
+ static const string name;
};
/*