]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: Remove Endpoint::dispatch_queue and queue_message;
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 9 Dec 2009 00:17:57 +0000 (16:17 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 10 Dec 2009 22:33:31 +0000 (14:33 -0800)
set up local pipe and necessary routing code.

Also, fix potential locking issue by pushing message onto
queue before pushing queue to round-robin

src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 7c357f9cc13162f79a04ad3a69fef9566c7833db..db04d868757d1c5f804b08e2ea1f7b1090fdb312 100644 (file)
@@ -1407,11 +1407,11 @@ void SimpleMessenger::Pipe::reader()
               << m->get_seq() << " " << m << " " << *m
               << dendl;
       
-      // deliver
+      //deliver
       if (rank->local_endpoint)
-       rank->local_endpoint->queue_message(m);
+       queue_received(m);
       else derr(0) << "reader got message " << *m
-                  << "which isn't local" << dendl;
+                  << "but there is no endpoint!" << dendl;
 
       lock.Lock();
     } 
@@ -2197,7 +2197,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool
       if (dest_addr.get_erank() == 0 && local_endpoint) {
         // local
         dout(20) << "submit_message " << *m << " local" << dendl;
-       local_endpoint->queue_message(m);
+       local_endpoint->local_delivery(m, m->get_priority());
       } else {
         derr(0) << "submit_message " << *m << " " << dest_addr << " local but wrong erank? dropping." << dendl;
         assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
index 681f21dae9f6b10f4d7eb233bd124c5a21f33b89..ca4bcaa205930645348ce2f2cef98506d87ee12f 100644 (file)
@@ -204,15 +204,22 @@ private:
       if ( 1 == queue.size()) {
        if (!queue_items.count(priority)) { //create an item for that priority
          pair< int, xlist<Pipe *>::item >
-           pair_item(p, xlist<Pipe *>::item(this));
+           pair_item(priority, xlist<Pipe *>::item(this));
          queue_items.insert(pair_item);
        }
        rank->local_endpoint->queue_lock.Lock();
-       rank->local_endpoint->queued_pipes[p].push_back(&queue_items[p]);
+       rank->local_endpoint->
+         queued_pipes[priority].push_back(&queue_items[priority]);
        rank->local_endpoint->queue_lock.Unlock();
       }
       lock.Unlock();
     }
+    
+    void queue_received(Message *m) {
+      m->set_recv_stamp(g_clock.now());
+      assert (m->nref.test() == 0);
+      queue_received(m, m->get_priority() );
+    }
 
     __u32 get_out_seq() { return out_seq; }
 
@@ -283,12 +290,12 @@ private:
   // messenger interface
   class Endpoint : public Messenger {
     SimpleMessenger *rank;
+    Pipe *local_pipe;
     Mutex lock;
     Cond cond;
     Mutex queue_lock;
     map<int, xlist<Pipe *> > queued_pipes;
     map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
-    map<int, list<Message*> > dispatch_queue;
     bool stop;
     int qlen;
     int my_rank;
@@ -307,42 +314,37 @@ private:
     friend class SimpleMessenger;
 
   public:
-    void queue_message(Message *m) {
-      // set recv stamp
-      m->set_recv_stamp(g_clock.now());
-
-      assert(m->nref.test() == 0);
-
-      lock.Lock();
-      qlen++;
-      dispatch_queue[m->get_priority()].push_back(m);
-      cond.Signal();
-      lock.Unlock();
-    }
-
     enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET };
     list<Connection*> connect_q;
     list<Connection*> remote_reset_q;
     list<Connection*> reset_q;
 
+    void local_delivery(Message *m, int priority) {
+      local_pipe->queue_received(m, priority);
+    }
+
+    void local_delivery(Message *m) {
+      local_pipe->queue_received(m);
+    }
+
     void queue_connect(Connection *con) {
       lock.Lock();
       connect_q.push_back(con);
-      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_CONNECT);
+      local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
       cond.Signal();
       lock.Unlock();
     }
     void queue_remote_reset(Connection *con) {
       lock.Lock();
       remote_reset_q.push_back(con);
-      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_REMOTE_RESET);
+      local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
       cond.Signal();
       lock.Unlock();
     }
     void queue_reset(Connection *con) {
       lock.Lock();
       reset_q.push_back(con);
-      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_RESET);
+      local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
       cond.Signal();
       lock.Unlock();
     }
@@ -356,8 +358,10 @@ private:
       stop(false),
       qlen(0),
       my_rank(rn),
-      dispatch_thread(this) { }
-    ~Endpoint() { }
+      dispatch_thread(this) {
+      local_pipe = new Pipe(r, Pipe::STATE_OPEN);
+    }
+    ~Endpoint() { delete local_pipe; }
 
     void destroy() {
       // join dispatch thread