From: Greg Farnum Date: Wed, 9 Dec 2009 00:17:57 +0000 (-0800) Subject: msgr: Remove Endpoint::dispatch_queue and queue_message; X-Git-Tag: v0.19~269 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ec0031b763774bb598739dbcd0ac61fd9038646c;p=ceph.git msgr: Remove Endpoint::dispatch_queue and queue_message; set up local pipe and necessary routing code. Also, fix potential locking issue by pushing message onto queue before pushing queue to round-robin --- diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 7c357f9cc131..db04d868757d 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1407,11 +1407,11 @@ void SimpleMessenger::Pipe::reader() << m->get_seq() << " " << m << " " << *m << dendl; - // deliver + //deliver if (rank->local_endpoint) - rank->local_endpoint->queue_message(m); + queue_received(m); else derr(0) << "reader got message " << *m - << "which isn't local" << dendl; + << "but there is no endpoint!" << dendl; lock.Lock(); } @@ -2197,7 +2197,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool if (dest_addr.get_erank() == 0 && local_endpoint) { // local dout(20) << "submit_message " << *m << " local" << dendl; - local_endpoint->queue_message(m); + local_endpoint->local_delivery(m, m->get_priority()); } else { derr(0) << "submit_message " << *m << " " << dest_addr << " local but wrong erank? dropping." << dendl; assert(0); // hmpf, this is probably mds->mon beacon from newsyn. diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 681f21dae9f6..ca4bcaa20593 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -204,15 +204,22 @@ private: if ( 1 == queue.size()) { if (!queue_items.count(priority)) { //create an item for that priority pair< int, xlist::item > - pair_item(p, xlist::item(this)); + pair_item(priority, xlist::item(this)); queue_items.insert(pair_item); } rank->local_endpoint->queue_lock.Lock(); - rank->local_endpoint->queued_pipes[p].push_back(&queue_items[p]); + rank->local_endpoint-> + queued_pipes[priority].push_back(&queue_items[priority]); rank->local_endpoint->queue_lock.Unlock(); } lock.Unlock(); } + + void queue_received(Message *m) { + m->set_recv_stamp(g_clock.now()); + assert (m->nref.test() == 0); + queue_received(m, m->get_priority() ); + } __u32 get_out_seq() { return out_seq; } @@ -283,12 +290,12 @@ private: // messenger interface class Endpoint : public Messenger { SimpleMessenger *rank; + Pipe *local_pipe; Mutex lock; Cond cond; Mutex queue_lock; map > queued_pipes; map::iterator> queued_pipe_iters; - map > dispatch_queue; bool stop; int qlen; int my_rank; @@ -307,42 +314,37 @@ private: friend class SimpleMessenger; public: - void queue_message(Message *m) { - // set recv stamp - m->set_recv_stamp(g_clock.now()); - - assert(m->nref.test() == 0); - - lock.Lock(); - qlen++; - dispatch_queue[m->get_priority()].push_back(m); - cond.Signal(); - lock.Unlock(); - } - enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET }; list connect_q; list remote_reset_q; list reset_q; + void local_delivery(Message *m, int priority) { + local_pipe->queue_received(m, priority); + } + + void local_delivery(Message *m) { + local_pipe->queue_received(m); + } + void queue_connect(Connection *con) { lock.Lock(); connect_q.push_back(con); - dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_CONNECT); + local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST); cond.Signal(); lock.Unlock(); } void queue_remote_reset(Connection *con) { lock.Lock(); remote_reset_q.push_back(con); - dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_REMOTE_RESET); + local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST); cond.Signal(); lock.Unlock(); } void queue_reset(Connection *con) { lock.Lock(); reset_q.push_back(con); - dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_RESET); + local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST); cond.Signal(); lock.Unlock(); } @@ -356,8 +358,10 @@ private: stop(false), qlen(0), my_rank(rn), - dispatch_thread(this) { } - ~Endpoint() { } + dispatch_thread(this) { + local_pipe = new Pipe(r, Pipe::STATE_OPEN); + } + ~Endpoint() { delete local_pipe; } void destroy() { // join dispatch thread