]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: SimpleMessenger takes responsibility for Messenger functions.
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 6 Jan 2010 00:48:17 +0000 (16:48 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 7 Jan 2010 01:18:48 +0000 (17:18 -0800)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index a1324e5966d0b8953bd1c927c435cbf727777f50..b95d9a72d4f2b422cd69ef870da06bbbc2ef244e 100644 (file)
@@ -267,14 +267,14 @@ void SimpleMessenger::Accepter::stop()
  * end of the queue. If the queue is empty; it's removed.
  * The message is then delivered and the process starts again.
  */
-void SimpleMessenger::Endpoint::dispatch_entry()
+void SimpleMessenger::dispatch_entry()
 {
-  DispatchQueue& q = rank->dispatch_queue;
-  q.lock.Lock();
-  while (!q.stop) {
-    while (!q.queued_pipes.empty() && !q.stop) {
+  dispatch_queue.lock.Lock();
+  while (!dispatch_queue.stop) {
+    while (!dispatch_queue.queued_pipes.empty() && !dispatch_queue.stop) {
       //get highest-priority pipe
-      map<int, xlist<Pipe *> >::reverse_iterator high_iter = q.queued_pipes.rbegin();
+      map<int, xlist<Pipe *> >::reverse_iterator high_iter =
+       dispatch_queue.queued_pipes.rbegin();
       int priority = high_iter->first;
       xlist<Pipe *>& pipe_list = high_iter->second;
 
@@ -288,38 +288,38 @@ void SimpleMessenger::Endpoint::dispatch_entry()
       if (m_queue.empty()) {
        pipe_list.pop_front();  // pipe is done
        if (pipe_list.empty())
-         q.queued_pipes.erase(priority);
+         dispatch_queue.queued_pipes.erase(priority);
       } else {
        pipe_list.push_back(pipe->queue_items[priority]);  // move to end of list
       }
-      q.lock.Unlock(); //done with the pipe queue for a while
+      dispatch_queue.lock.Unlock(); //done with the pipe queue for a while
 
       pipe->in_qlen--;
-      q.qlen_lock.lock();
-      q.qlen--;
-      q.qlen_lock.unlock();
+      dispatch_queue.qlen_lock.lock();
+      dispatch_queue.qlen--;
+      dispatch_queue.qlen_lock.unlock();
 
       pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
       {
        if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
-         q.lock.Lock();
-         Connection *con = q.remote_reset_q.front();
-         q.remote_reset_q.pop_front();
-         q.lock.Unlock();
+         dispatch_queue.lock.Lock();
+         Connection *con = dispatch_queue.remote_reset_q.front();
+         dispatch_queue.remote_reset_q.pop_front();
+         dispatch_queue.lock.Unlock();
          ms_deliver_handle_remote_reset(con);
          con->put();
        } else if ((long)m == DispatchQueue::D_CONNECT) {
-         q.lock.Lock();
-         Connection *con = q.connect_q.front();
-         q.connect_q.pop_front();
-         q.lock.Unlock();
+         dispatch_queue.lock.Lock();
+         Connection *con = dispatch_queue.connect_q.front();
+         dispatch_queue.connect_q.pop_front();
+         dispatch_queue.lock.Unlock();
          ms_deliver_handle_connect(con);
          con->put();
        } else if ((long)m == DispatchQueue::D_BAD_RESET) {
-         q.lock.Lock();
-         Connection *con = q.reset_q.front();
-         q.reset_q.pop_front();
-         q.lock.Unlock();
+         dispatch_queue.lock.Lock();
+         Connection *con = dispatch_queue.reset_q.front();
+         dispatch_queue.reset_q.pop_front();
+         dispatch_queue.lock.Unlock();
          ms_deliver_handle_reset(con);
          con->put();
        } else {
@@ -336,20 +336,18 @@ void SimpleMessenger::Endpoint::dispatch_entry()
          dout(20) << "done calling dispatch on " << m << dendl;
        }
       }
-      q.lock.Lock();
+      dispatch_queue.lock.Lock();
     }
-    if (!q.stop)
-      q.cond.Wait(q.lock); //wait for something to get put on queue
+    if (!dispatch_queue.stop)
+      dispatch_queue.cond.Wait(dispatch_queue.lock); //wait for something to be put on queue
   }
-  q.lock.Unlock();
+  dispatch_queue.lock.Unlock();
   dout(15) << "dispatch: ending loop " << dendl;
   
-  // deregister
-  rank->unregister_entity(this);
-  put();
+  put(); //this thread is shutting down, so one less reference
 }
 
-void SimpleMessenger::Endpoint::ready()
+void SimpleMessenger::ready()
 {
   dout(10) << "ready " << get_myaddr() << dendl;
   assert(!dispatch_thread.is_started());
@@ -358,43 +356,42 @@ void SimpleMessenger::Endpoint::ready()
 }
 
 
-int SimpleMessenger::Endpoint::shutdown()
+int SimpleMessenger::shutdown()
 {
   dout(10) << "shutdown " << get_myaddr() << dendl;
-  DispatchQueue& q = rank->dispatch_queue;
 
   // stop my dispatch thread
   if (dispatch_thread.am_self()) {
     dout(10) << "shutdown i am dispatch, setting stop flag" << dendl;
-    q.stop = true;
+    dispatch_queue.stop = true;
   } else {
     dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
-    q.lock.Lock();
-    q.stop = true;
-    q.cond.Signal();
-    q.lock.Unlock();
+    dispatch_queue.lock.Lock();
+    dispatch_queue.stop = true;
+    dispatch_queue.cond.Signal();
+    dispatch_queue.lock.Unlock();
   }
   return 0;
 }
 
-void SimpleMessenger::Endpoint::suicide()
+void SimpleMessenger::suicide()
 {
   dout(10) << "suicide " << get_myaddr() << dendl;
   shutdown();
   // hmm, or exit(0)?
 }
 
-void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst)
+void SimpleMessenger::prepare_dest(const entity_inst_t& inst)
 {
-  rank->lock.Lock();
+  lock.Lock();
   {
-    if (rank->rank_pipe.count(inst.addr) == 0)
-      rank->connect_rank(inst.addr, inst.name.type());
+    if (rank_pipe.count(inst.addr) == 0)
+      connect_rank(inst.addr, inst.name.type());
   }
-  rank->lock.Unlock();
+  lock.Unlock();
 }
 
-int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::send_message(Message *m, entity_inst_t dest)
 {
   // set envelope
   m->get_header().src = get_myinst();
@@ -408,12 +405,12 @@ int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
          << " " << m 
          << dendl;
 
-  rank->submit_message(m, dest);
+  submit_message(m, dest);
 
   return 0;
 }
 
-int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::forward_message(Message *m, entity_inst_t dest)
 {
   // set envelope
   m->get_header().src = get_myinst();
@@ -426,14 +423,14 @@ int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
          << " " << m 
           << dendl;
 
-  rank->submit_message(m, dest);
+  submit_message(m, dest);
 
   return 0;
 }
 
 
 
-int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::lazy_send_message(Message *m, entity_inst_t dest)
 {
   // set envelope
   m->get_header().src = get_myinst();
@@ -448,26 +445,12 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
          << " " << m 
           << dendl;
 
-  rank->submit_message(m, dest, true);
-
-  return 0;
-}
+  submit_message(m, dest, true);
 
-int SimpleMessenger::Endpoint::send_keepalive(entity_inst_t dest)
-{
-  rank->send_keepalive(dest);
   return 0;
 }
 
-
-
-void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
-{
-  rank->mark_down(a);
-}
-
-
-entity_addr_t SimpleMessenger::Endpoint::get_myaddr()
+entity_addr_t SimpleMessenger::get_myaddr()
 {
   entity_addr_t a = rank->rank_addr;
   a.erank = 0;
index 746e07a061ef51ea910cb2cd21d7ce058fbe5447..710661517f449a64f8520f2cefbbd426c93f75cd 100644 (file)
@@ -45,6 +45,10 @@ using namespace __gnu_cxx;
  *    cleaner to merge the class with SimpleMessenger itself.
  * 3) Pipe. Each network connection is handled through a pipe, which handles
  *    the input and output of each message.
+ *
+ * This class should only be created on the heap, and it should be destroyed
+ * via a call to destroy(). Making it on the stack or otherwise calling
+ * the destructor will lead to badness.
  */
 
 /* Rank - per-process
@@ -114,7 +118,7 @@ private:
     int state;
 
   protected:
-    friend class Endpoint;
+    friend class SimpleMessenger;
     Connection *connection_state;
 
     utime_t backoff;         // backoff time
@@ -389,62 +393,39 @@ private:
       }
     }
   } dispatch_queue;
-  
 
-  // messenger interface
   class Endpoint : public Messenger {
     SimpleMessenger *rank;
-
-    class DispatchThread : public Thread {
-      Endpoint *m;
-    public:
-      DispatchThread(Endpoint *_m) : m(_m) {}
-      void *entry() {
-        m->dispatch_entry();
-        return 0;
-      }
-    } dispatch_thread;
-    void dispatch_entry();
-
     friend class SimpleMessenger;
-
   public:
-    Endpoint(SimpleMessenger *r, entity_name_t name) : 
+    Endpoint(SimpleMessenger *r, entity_name_t name) :
       Messenger(name),
-      rank(r),
-      dispatch_thread(this) {
-    }
-    ~Endpoint() {
-    }
+      rank(r) {}
+    ~Endpoint() {}
 
     void destroy() {
-      // join dispatch thread
-      if (dispatch_thread.is_started())
-       dispatch_thread.join();
-
+      rank->destroy();
       Messenger::destroy();
     }
 
-    void ready();
-    //bool is_stopped() { return stop; }
+    void ready() { rank->ready(); }
 
-    int get_dispatch_queue_len() { return rank->dispatch_queue.get_queue_len(); }
+    int get_dispatch_queue_len() { return rank->get_dispatch_queue_len(); }
 
-    entity_addr_t get_myaddr();
+    entity_addr_t get_myaddr() { return rank->get_myaddr(); }
 
-    int shutdown();
-    void suicide();
-    void prepare_dest(const entity_inst_t& inst);
-    int send_message(Message *m, entity_inst_t dest);
-    int forward_message(Message *m, entity_inst_t dest);
-    int lazy_send_message(Message *m, entity_inst_t dest);
-    int send_keepalive(entity_inst_t dest);
-
-    void mark_down(entity_addr_t a);
-    void mark_up(entity_name_t a, entity_addr_t& i);
+    int shutdown() { return rank->shutdown(); }
+    void suicide() { rank->suicide(); }
+    void prepare_dest(const entity_inst_t& inst) { rank->prepare_dest(inst); }
+    int send_message(Message *m, entity_inst_t dest) {
+      return rank->send_message(m, dest); }
+    int forward_message(Message *m, entity_inst_t dest) {
+      return rank->forward_message(m, dest); }
+    int lazy_send_message(Message *m, entity_inst_t dest) {
+      return rank->lazy_send_message(m, dest); }
+    int send_keepalive(entity_inst_t dest) { return rank->send_keepalive(dest);}
   };
-
-
+  
   // SimpleMessenger stuff
  public:
   Mutex lock;
@@ -491,69 +472,51 @@ private:
       return default_policy;
   }
 
-  //Messenger-required functions
+  /***** Messenger-required functions  **********/
   void destroy() {
-    if (!endpoint_stopped)
-      local_endpoint->destroy();
+    if (dispatch_thread.is_started())
+      dispatch_thread.join();
     Messenger::destroy();
   }
 
-  entity_addr_t get_myaddr() {
-    if (!endpoint_stopped)
-      return local_endpoint->get_myaddr();
-    return entity_addr_t();
-  }
+  entity_addr_t get_myaddr();
 
   int get_dispatch_queue_len() {
     return dispatch_queue.get_queue_len();
   }
 
-  void ready() {
-    if (!endpoint_stopped)
-      local_endpoint->ready();
-  }
-
-  int shutdown() {
-    if (!endpoint_stopped)
-      return local_endpoint->shutdown();
-    return -1;
-  }
-
-  void suicide() {
-    if (!endpoint_stopped)
-      local_endpoint->suicide();
-  }
+  void ready();
+  int shutdown();
+  void suicide();
+  void prepare_dest(const entity_inst_t& inst);
+  int send_message(Message *m, entity_inst_t dest);
+  int forward_message(Message *m, entity_inst_t dest);
+  int lazy_send_message(Message *m, entity_inst_t dest);
+  /***********************/
 
-  void prepare_dest(const entity_inst_t& inst) {
-    if (!endpoint_stopped)
-      local_endpoint->prepare_dest(inst);
-  }
+private:
+  class DispatchThread : public Thread {
+    SimpleMessenger *rank;
+  public:
+    DispatchThread(SimpleMessenger *_rank) : rank(_rank) {}
+    void *entry() {
+      rank->dispatch_entry();
+      return 0;
+    }
+  } dispatch_thread;
 
-  int send_message(Message *m, entity_inst_t dest) {
-    if (!endpoint_stopped)
-      return local_endpoint->send_message(m, dest);
-    return -1;
-  }
+  void dispatch_entry();
 
-  int forward_message(Message *m, entity_inst_t dest) {
-    if (!endpoint_stopped)
-      return local_endpoint->forward_message(m, dest);
-    return -1;
-  }
+  SimpleMessenger *rank; //hack to make dout macro work, will fix
 
-  int lazy_send_message(Message *m, entity_inst_t dest) {
-    if (!endpoint_stopped)
-      return local_endpoint->lazy_send_message(m, dest);
-    return -1;
-  }
-  
 public:
   SimpleMessenger() :
     Messenger(entity_name_t()),
     accepter(this),
     lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true),
     local_endpoint(NULL), endpoint_stopped(true), my_type(-1),
-    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) {
+    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
+    dispatch_thread(this), rank(this) {
     // for local dmsg delivery
     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
   }