From: Sage Weil Date: Mon, 28 Jun 2010 21:15:59 +0000 (-0700) Subject: msgr: use dedicated reaper thread X-Git-Tag: v0.21~294 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fcc39c8113fa56c6d8522758ea22aa01823a56a5;p=ceph.git msgr: use dedicated reaper thread We were calling the reaper from the wait() loop. The problem is that the OSD has two messengers, and only the first was in wait().. the second wait() was only called after the first terminated (i.e, when the OSD was shutting down). Instead, launch a separate reaper thread when we bind, and close it out on shutdown right after the accepter. --- diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 2ba249526ead..937f1764f94f 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1694,14 +1694,7 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() pipe_lock.Unlock(); - // queue for reap - dout(10) << "unlock_maybe_reap queueing for reap" << dendl; - messenger->lock.Lock(); - { - messenger->pipe_reap_queue.push_back(this); - messenger->wait_cond.Signal(); - } - messenger->lock.Unlock(); + messenger->queue_reap(this); } else { pipe_lock.Unlock(); } @@ -2103,6 +2096,18 @@ int SimpleMessenger::Pipe::write_message(Message *m) #define dout_prefix _prefix(this) +void SimpleMessenger::reaper_entry() +{ + dout(10) << "reaper_entry start" << dendl; + lock.Lock(); + while (!reaper_stop) { + reaper(); + reaper_cond.Wait(lock); + } + lock.Unlock(); + dout(10) << "reaper_entry done" << dendl; +} + /* * note: assumes lock is held */ @@ -2129,9 +2134,20 @@ void SimpleMessenger::reaper() p->put(); dout(10) << "reaper deleted pipe " << p << dendl; } + dout(10) << "reaper done" << dendl; +} + +void SimpleMessenger::queue_reap(Pipe *pipe) +{ + dout(10) << "queue_reap " << pipe << dendl; + lock.Lock(); + pipe_reap_queue.push_back(pipe); + reaper_cond.Signal(); + lock.Unlock(); } + int SimpleMessenger::bind(int64_t force_nonce) { lock.Lock(); @@ -2247,8 +2263,12 @@ int SimpleMessenger::start(bool nodaemon) } // go! - if (did_bind) + if (did_bind) { accepter.start(); + + reaper_started = true; + reaper_thread.create(); + } return 0; } @@ -2441,23 +2461,15 @@ int SimpleMessenger::send_keepalive(const entity_inst_t& dest) - - void SimpleMessenger::wait() { lock.Lock(); - while (1) { - // reap dead pipes - reaper(); - - if (destination_stopped) { - dout(10) << "wait: everything stopped" << dendl; - break; // everything stopped. - } - - dout(10) << "wait: local_endpoint still active" << dendl; + while (!destination_stopped) { + dout(10) << "wait: still active" << dendl; wait_cond.Wait(lock); + dout(10) << "wait: woke up" << dendl; } + dout(10) << "wait: everything stopped" << dendl; lock.Unlock(); // done! clean up. @@ -2467,6 +2479,16 @@ void SimpleMessenger::wait() dout(20) << "wait: stopped accepter thread" << dendl; } + if (reaper_started) { + dout(20) << "wait: stopping reaper thread" << dendl; + lock.Lock(); + reaper_cond.Signal(); + reaper_stop = true; + lock.Unlock(); + reaper_thread.join(); + dout(20) << "wait: stopped reaper thread" << dendl; + } + // close+reap all pipes lock.Lock(); { @@ -2483,7 +2505,7 @@ void SimpleMessenger::wait() reaper(); dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; while (!pipes.empty()) { - wait_cond.Wait(lock); + reaper_cond.Wait(lock); reaper(); } } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 968babf8ab86..023f16b6bc4b 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -452,7 +452,25 @@ private: void mark_down(const entity_addr_t& addr); + // reaper + class ReaperThread : public Thread { + SimpleMessenger *messenger; + public: + ReaperThread(SimpleMessenger *m) : messenger(m) {} + void *entry() { + messenger->get(); + messenger->reaper_entry(); + messenger->put(); + return 0; + } + } reaper_thread; + + bool reaper_started, reaper_stop; + Cond reaper_cond; + + void reaper_entry(); void reaper(); + void queue_reap(Pipe *pipe); Policy get_policy(int t) { if (policy_map.count(t)) @@ -512,6 +530,7 @@ public: message_throttler(g_conf.ms_waiting_message_bytes), need_addr(true), destination_stopped(true), my_type(-1), global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0), + reaper_thread(this), reaper_started(false), reaper_stop(false), dispatch_thread(this), messenger(this) { // for local dmsg delivery dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);