]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: use dedicated reaper thread
authorSage Weil <sage@newdream.net>
Mon, 28 Jun 2010 21:15:59 +0000 (14:15 -0700)
committerSage Weil <sage@newdream.net>
Tue, 29 Jun 2010 21:40:23 +0000 (14:40 -0700)
We were calling the reaper from the wait() loop.  The problem is that
the OSD has two messengers, and only the first was in wait().. the second
wait() was only called after the first terminated (i.e, when the OSD was
shutting down).

Instead, launch a separate reaper thread when we bind, and close it out
on shutdown right after the accepter.

src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 2ba249526ead6348b84870c3e743856c9dd288b7..937f1764f94fc19e839353bf5dae68ded19e4dfe 100644 (file)
@@ -1694,14 +1694,7 @@ void SimpleMessenger::Pipe::unlock_maybe_reap()
 
     pipe_lock.Unlock();
 
-    // queue for reap
-    dout(10) << "unlock_maybe_reap queueing for reap" << dendl;
-    messenger->lock.Lock();
-    {
-      messenger->pipe_reap_queue.push_back(this);
-      messenger->wait_cond.Signal();
-    }
-    messenger->lock.Unlock();
+    messenger->queue_reap(this);
   } else {
     pipe_lock.Unlock();
   }
@@ -2103,6 +2096,18 @@ int SimpleMessenger::Pipe::write_message(Message *m)
 #define dout_prefix _prefix(this)
 
 
+void SimpleMessenger::reaper_entry()
+{
+  dout(10) << "reaper_entry start" << dendl;
+  lock.Lock();
+  while (!reaper_stop) {
+    reaper();
+    reaper_cond.Wait(lock);
+  }
+  lock.Unlock();
+  dout(10) << "reaper_entry done" << dendl;
+}
+
 /*
  * note: assumes lock is held
  */
@@ -2129,9 +2134,20 @@ void SimpleMessenger::reaper()
     p->put();
     dout(10) << "reaper deleted pipe " << p << dendl;
   }
+  dout(10) << "reaper done" << dendl;
+}
+
+void SimpleMessenger::queue_reap(Pipe *pipe)
+{
+  dout(10) << "queue_reap " << pipe << dendl;
+  lock.Lock();
+  pipe_reap_queue.push_back(pipe);
+  reaper_cond.Signal();
+  lock.Unlock();
 }
 
 
+
 int SimpleMessenger::bind(int64_t force_nonce)
 {
   lock.Lock();
@@ -2247,8 +2263,12 @@ int SimpleMessenger::start(bool nodaemon)
   }
 
   // go!
-  if (did_bind)
+  if (did_bind) {
     accepter.start();
+
+    reaper_started = true;
+    reaper_thread.create();
+  }
   return 0;
 }
 
@@ -2441,23 +2461,15 @@ int SimpleMessenger::send_keepalive(const entity_inst_t& dest)
 
 
 
-
-
 void SimpleMessenger::wait()
 {
   lock.Lock();
-  while (1) {
-    // reap dead pipes
-    reaper();
-
-    if (destination_stopped) {
-      dout(10) << "wait: everything stopped" << dendl;
-      break;   // everything stopped.
-    }
-
-    dout(10) << "wait: local_endpoint still active" << dendl;
+  while (!destination_stopped) {
+    dout(10) << "wait: still active" << dendl;
     wait_cond.Wait(lock);
+    dout(10) << "wait: woke up" << dendl;
   }
+  dout(10) << "wait: everything stopped" << dendl;
   lock.Unlock();
   
   // done!  clean up.
@@ -2467,6 +2479,16 @@ void SimpleMessenger::wait()
     dout(20) << "wait: stopped accepter thread" << dendl;
   }
 
+  if (reaper_started) {
+    dout(20) << "wait: stopping reaper thread" << dendl;
+    lock.Lock();
+    reaper_cond.Signal();
+    reaper_stop = true;
+    lock.Unlock();
+    reaper_thread.join();
+    dout(20) << "wait: stopped reaper thread" << dendl;
+  }
+
   // close+reap all pipes
   lock.Lock();
   {
@@ -2483,7 +2505,7 @@ void SimpleMessenger::wait()
     reaper();
     dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
     while (!pipes.empty()) {
-      wait_cond.Wait(lock);
+      reaper_cond.Wait(lock);
       reaper();
     }
   }
index 968babf8ab86e9f3fc654e2c95d6deb6f6230ba0..023f16b6bc4ba45d253c697196175efc8d8a1cf3 100644 (file)
@@ -452,7 +452,25 @@ private:
 
   void mark_down(const entity_addr_t& addr);
 
+  // reaper
+  class ReaperThread : public Thread {
+    SimpleMessenger *messenger;
+  public:
+    ReaperThread(SimpleMessenger *m) : messenger(m) {}
+    void *entry() {
+      messenger->get();
+      messenger->reaper_entry();
+      messenger->put();
+      return 0;
+    }
+  } reaper_thread;
+
+  bool reaper_started, reaper_stop;
+  Cond reaper_cond;
+
+  void reaper_entry();
   void reaper();
+  void queue_reap(Pipe *pipe);
 
   Policy get_policy(int t) {
     if (policy_map.count(t))
@@ -512,6 +530,7 @@ public:
     message_throttler(g_conf.ms_waiting_message_bytes), need_addr(true),
     destination_stopped(true), my_type(-1),
     global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
+    reaper_thread(this), reaper_started(false), reaper_stop(false), 
     dispatch_thread(this), messenger(this) {
     // for local dmsg delivery
     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);