pipe_lock.Unlock();
- // queue for reap
- dout(10) << "unlock_maybe_reap queueing for reap" << dendl;
- messenger->lock.Lock();
- {
- messenger->pipe_reap_queue.push_back(this);
- messenger->wait_cond.Signal();
- }
- messenger->lock.Unlock();
+ messenger->queue_reap(this);
} else {
pipe_lock.Unlock();
}
#define dout_prefix _prefix(this)
+void SimpleMessenger::reaper_entry()
+{
+ dout(10) << "reaper_entry start" << dendl;
+ lock.Lock();
+ while (!reaper_stop) {
+ reaper();
+ reaper_cond.Wait(lock);
+ }
+ lock.Unlock();
+ dout(10) << "reaper_entry done" << dendl;
+}
+
/*
* note: assumes lock is held
*/
p->put();
dout(10) << "reaper deleted pipe " << p << dendl;
}
+ dout(10) << "reaper done" << dendl;
+}
+
+void SimpleMessenger::queue_reap(Pipe *pipe)
+{
+ dout(10) << "queue_reap " << pipe << dendl;
+ lock.Lock();
+ pipe_reap_queue.push_back(pipe);
+ reaper_cond.Signal();
+ lock.Unlock();
}
+
int SimpleMessenger::bind(int64_t force_nonce)
{
lock.Lock();
}
// go!
- if (did_bind)
+ if (did_bind) {
accepter.start();
+
+ reaper_started = true;
+ reaper_thread.create();
+ }
return 0;
}
-
-
void SimpleMessenger::wait()
{
lock.Lock();
- while (1) {
- // reap dead pipes
- reaper();
-
- if (destination_stopped) {
- dout(10) << "wait: everything stopped" << dendl;
- break; // everything stopped.
- }
-
- dout(10) << "wait: local_endpoint still active" << dendl;
+ while (!destination_stopped) {
+ dout(10) << "wait: still active" << dendl;
wait_cond.Wait(lock);
+ dout(10) << "wait: woke up" << dendl;
}
+ dout(10) << "wait: everything stopped" << dendl;
lock.Unlock();
// done! clean up.
dout(20) << "wait: stopped accepter thread" << dendl;
}
+ if (reaper_started) {
+ dout(20) << "wait: stopping reaper thread" << dendl;
+ lock.Lock();
+ reaper_cond.Signal();
+ reaper_stop = true;
+ lock.Unlock();
+ reaper_thread.join();
+ dout(20) << "wait: stopped reaper thread" << dendl;
+ }
+
// close+reap all pipes
lock.Lock();
{
reaper();
dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
while (!pipes.empty()) {
- wait_cond.Wait(lock);
+ reaper_cond.Wait(lock);
reaper();
}
}
void mark_down(const entity_addr_t& addr);
+ // reaper
+ class ReaperThread : public Thread {
+ SimpleMessenger *messenger;
+ public:
+ ReaperThread(SimpleMessenger *m) : messenger(m) {}
+ void *entry() {
+ messenger->get();
+ messenger->reaper_entry();
+ messenger->put();
+ return 0;
+ }
+ } reaper_thread;
+
+ bool reaper_started, reaper_stop;
+ Cond reaper_cond;
+
+ void reaper_entry();
void reaper();
+ void queue_reap(Pipe *pipe);
Policy get_policy(int t) {
if (policy_map.count(t))
message_throttler(g_conf.ms_waiting_message_bytes), need_addr(true),
destination_stopped(true), my_type(-1),
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
+ reaper_thread(this), reaper_started(false), reaper_stop(false),
dispatch_thread(this), messenger(this) {
// for local dmsg delivery
dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);