*******************/
const string WorkerPool::name = "AsyncMessenger::WorkerPool";
-WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false)
+WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false),
+ barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
+ barrier_count(0)
{
assert(cct->_conf->ms_async_op_threads > 0);
for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
}
}
+void WorkerPool::barrier()
+{
+ ldout(cct, 10) << __func__ << " started." << dendl;
+ pthread_t cur = pthread_self();
+ uint64_t send = 0;
+ for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
+ assert(cur != (*it)->center.get_owner());
+ (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this)));
+ barrier_count.inc();
+ }
+ ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl;
+ Mutex::Locker l(barrier_lock);
+ while (barrier_count.read())
+ barrier_cond.Wait(barrier_lock);
+
+ ldout(cct, 10) << __func__ << " end." << dendl;
+}
+
/*******************
* AsyncMessenger
int AsyncMessenger::shutdown()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
- mark_down_all();
// break ref cycles on the loopback connection
processor.stop();
+ mark_down_all();
local_connection->set_priv(NULL);
+ pool->barrier();
lock.Lock();
stop_cond.Signal();
lock.Unlock();
AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
p->mark_down();
- ms_deliver_handle_reset(p.get());
}
accepting_conns.clear();
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
p->mark_down();
- ms_deliver_handle_reset(p.get());
}
while (!deleted_conns.empty()) {
if (p) {
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
p->mark_down();
- ms_deliver_handle_reset(p.get());
} else {
ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
}
vector<int> coreids;
// Used to indicate whether thread started
bool started;
-
+ Mutex barrier_lock;
+ Cond barrier_cond;
+ atomic_t barrier_count;
+
+ class C_barrier : public EventCallback {
+ WorkerPool *pool;
+ public:
+ C_barrier(WorkerPool *p): pool(p) {}
+ void do_request(int id) {
+ Mutex::Locker l(pool->barrier_lock);
+ pool->barrier_count.dec();
+ pool->barrier_cond.Signal();
+ }
+ };
+ friend class C_barrier;
public:
WorkerPool(CephContext *c);
virtual ~WorkerPool();
return -1;
return coreids[id % coreids.size()];
}
+ void barrier();
// uniq name for CephContext to distinguish differnt object
static const string name;
};