From 04ba435baf3261b0208d6bc82e6c2a8f9601480e Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Tue, 5 Jan 2010 16:48:17 -0800 Subject: [PATCH] msgr: SimpleMessenger takes responsibility for Messenger functions. --- src/msg/SimpleMessenger.cc | 113 +++++++++++++------------------ src/msg/SimpleMessenger.h | 135 ++++++++++++++----------------------- 2 files changed, 97 insertions(+), 151 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index a1324e5966d0b..b95d9a72d4f2b 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -267,14 +267,14 @@ void SimpleMessenger::Accepter::stop() * end of the queue. If the queue is empty; it's removed. * The message is then delivered and the process starts again. */ -void SimpleMessenger::Endpoint::dispatch_entry() +void SimpleMessenger::dispatch_entry() { - DispatchQueue& q = rank->dispatch_queue; - q.lock.Lock(); - while (!q.stop) { - while (!q.queued_pipes.empty() && !q.stop) { + dispatch_queue.lock.Lock(); + while (!dispatch_queue.stop) { + while (!dispatch_queue.queued_pipes.empty() && !dispatch_queue.stop) { //get highest-priority pipe - map >::reverse_iterator high_iter = q.queued_pipes.rbegin(); + map >::reverse_iterator high_iter = + dispatch_queue.queued_pipes.rbegin(); int priority = high_iter->first; xlist& pipe_list = high_iter->second; @@ -288,38 +288,38 @@ void SimpleMessenger::Endpoint::dispatch_entry() if (m_queue.empty()) { pipe_list.pop_front(); // pipe is done if (pipe_list.empty()) - q.queued_pipes.erase(priority); + dispatch_queue.queued_pipes.erase(priority); } else { pipe_list.push_back(pipe->queue_items[priority]); // move to end of list } - q.lock.Unlock(); //done with the pipe queue for a while + dispatch_queue.lock.Unlock(); //done with the pipe queue for a while pipe->in_qlen--; - q.qlen_lock.lock(); - q.qlen--; - q.qlen_lock.unlock(); + dispatch_queue.qlen_lock.lock(); + dispatch_queue.qlen--; + dispatch_queue.qlen_lock.unlock(); pipe->pipe_lock.Unlock(); // done with the pipe's message queue now { if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) { - q.lock.Lock(); - Connection *con = q.remote_reset_q.front(); - q.remote_reset_q.pop_front(); - q.lock.Unlock(); + dispatch_queue.lock.Lock(); + Connection *con = dispatch_queue.remote_reset_q.front(); + dispatch_queue.remote_reset_q.pop_front(); + dispatch_queue.lock.Unlock(); ms_deliver_handle_remote_reset(con); con->put(); } else if ((long)m == DispatchQueue::D_CONNECT) { - q.lock.Lock(); - Connection *con = q.connect_q.front(); - q.connect_q.pop_front(); - q.lock.Unlock(); + dispatch_queue.lock.Lock(); + Connection *con = dispatch_queue.connect_q.front(); + dispatch_queue.connect_q.pop_front(); + dispatch_queue.lock.Unlock(); ms_deliver_handle_connect(con); con->put(); } else if ((long)m == DispatchQueue::D_BAD_RESET) { - q.lock.Lock(); - Connection *con = q.reset_q.front(); - q.reset_q.pop_front(); - q.lock.Unlock(); + dispatch_queue.lock.Lock(); + Connection *con = dispatch_queue.reset_q.front(); + dispatch_queue.reset_q.pop_front(); + dispatch_queue.lock.Unlock(); ms_deliver_handle_reset(con); con->put(); } else { @@ -336,20 +336,18 @@ void SimpleMessenger::Endpoint::dispatch_entry() dout(20) << "done calling dispatch on " << m << dendl; } } - q.lock.Lock(); + dispatch_queue.lock.Lock(); } - if (!q.stop) - q.cond.Wait(q.lock); //wait for something to get put on queue + if (!dispatch_queue.stop) + dispatch_queue.cond.Wait(dispatch_queue.lock); //wait for something to be put on queue } - q.lock.Unlock(); + dispatch_queue.lock.Unlock(); dout(15) << "dispatch: ending loop " << dendl; - // deregister - rank->unregister_entity(this); - put(); + put(); //this thread is shutting down, so one less reference } -void SimpleMessenger::Endpoint::ready() +void SimpleMessenger::ready() { dout(10) << "ready " << get_myaddr() << dendl; assert(!dispatch_thread.is_started()); @@ -358,43 +356,42 @@ void SimpleMessenger::Endpoint::ready() } -int SimpleMessenger::Endpoint::shutdown() +int SimpleMessenger::shutdown() { dout(10) << "shutdown " << get_myaddr() << dendl; - DispatchQueue& q = rank->dispatch_queue; // stop my dispatch thread if (dispatch_thread.am_self()) { dout(10) << "shutdown i am dispatch, setting stop flag" << dendl; - q.stop = true; + dispatch_queue.stop = true; } else { dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; - q.lock.Lock(); - q.stop = true; - q.cond.Signal(); - q.lock.Unlock(); + dispatch_queue.lock.Lock(); + dispatch_queue.stop = true; + dispatch_queue.cond.Signal(); + dispatch_queue.lock.Unlock(); } return 0; } -void SimpleMessenger::Endpoint::suicide() +void SimpleMessenger::suicide() { dout(10) << "suicide " << get_myaddr() << dendl; shutdown(); // hmm, or exit(0)? } -void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst) +void SimpleMessenger::prepare_dest(const entity_inst_t& inst) { - rank->lock.Lock(); + lock.Lock(); { - if (rank->rank_pipe.count(inst.addr) == 0) - rank->connect_rank(inst.addr, inst.name.type()); + if (rank_pipe.count(inst.addr) == 0) + connect_rank(inst.addr, inst.name.type()); } - rank->lock.Unlock(); + lock.Unlock(); } -int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest) +int SimpleMessenger::send_message(Message *m, entity_inst_t dest) { // set envelope m->get_header().src = get_myinst(); @@ -408,12 +405,12 @@ int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank->submit_message(m, dest); + submit_message(m, dest); return 0; } -int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest) +int SimpleMessenger::forward_message(Message *m, entity_inst_t dest) { // set envelope m->get_header().src = get_myinst(); @@ -426,14 +423,14 @@ int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank->submit_message(m, dest); + submit_message(m, dest); return 0; } -int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) +int SimpleMessenger::lazy_send_message(Message *m, entity_inst_t dest) { // set envelope m->get_header().src = get_myinst(); @@ -448,26 +445,12 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank->submit_message(m, dest, true); - - return 0; -} + submit_message(m, dest, true); -int SimpleMessenger::Endpoint::send_keepalive(entity_inst_t dest) -{ - rank->send_keepalive(dest); return 0; } - - -void SimpleMessenger::Endpoint::mark_down(entity_addr_t a) -{ - rank->mark_down(a); -} - - -entity_addr_t SimpleMessenger::Endpoint::get_myaddr() +entity_addr_t SimpleMessenger::get_myaddr() { entity_addr_t a = rank->rank_addr; a.erank = 0; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 746e07a061ef5..710661517f449 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -45,6 +45,10 @@ using namespace __gnu_cxx; * cleaner to merge the class with SimpleMessenger itself. * 3) Pipe. Each network connection is handled through a pipe, which handles * the input and output of each message. + * + * This class should only be created on the heap, and it should be destroyed + * via a call to destroy(). Making it on the stack or otherwise calling + * the destructor will lead to badness. */ /* Rank - per-process @@ -114,7 +118,7 @@ private: int state; protected: - friend class Endpoint; + friend class SimpleMessenger; Connection *connection_state; utime_t backoff; // backoff time @@ -389,62 +393,39 @@ private: } } } dispatch_queue; - - // messenger interface class Endpoint : public Messenger { SimpleMessenger *rank; - - class DispatchThread : public Thread { - Endpoint *m; - public: - DispatchThread(Endpoint *_m) : m(_m) {} - void *entry() { - m->dispatch_entry(); - return 0; - } - } dispatch_thread; - void dispatch_entry(); - friend class SimpleMessenger; - public: - Endpoint(SimpleMessenger *r, entity_name_t name) : + Endpoint(SimpleMessenger *r, entity_name_t name) : Messenger(name), - rank(r), - dispatch_thread(this) { - } - ~Endpoint() { - } + rank(r) {} + ~Endpoint() {} void destroy() { - // join dispatch thread - if (dispatch_thread.is_started()) - dispatch_thread.join(); - + rank->destroy(); Messenger::destroy(); } - void ready(); - //bool is_stopped() { return stop; } + void ready() { rank->ready(); } - int get_dispatch_queue_len() { return rank->dispatch_queue.get_queue_len(); } + int get_dispatch_queue_len() { return rank->get_dispatch_queue_len(); } - entity_addr_t get_myaddr(); + entity_addr_t get_myaddr() { return rank->get_myaddr(); } - int shutdown(); - void suicide(); - void prepare_dest(const entity_inst_t& inst); - int send_message(Message *m, entity_inst_t dest); - int forward_message(Message *m, entity_inst_t dest); - int lazy_send_message(Message *m, entity_inst_t dest); - int send_keepalive(entity_inst_t dest); - - void mark_down(entity_addr_t a); - void mark_up(entity_name_t a, entity_addr_t& i); + int shutdown() { return rank->shutdown(); } + void suicide() { rank->suicide(); } + void prepare_dest(const entity_inst_t& inst) { rank->prepare_dest(inst); } + int send_message(Message *m, entity_inst_t dest) { + return rank->send_message(m, dest); } + int forward_message(Message *m, entity_inst_t dest) { + return rank->forward_message(m, dest); } + int lazy_send_message(Message *m, entity_inst_t dest) { + return rank->lazy_send_message(m, dest); } + int send_keepalive(entity_inst_t dest) { return rank->send_keepalive(dest);} }; - - + // SimpleMessenger stuff public: Mutex lock; @@ -491,69 +472,51 @@ private: return default_policy; } - //Messenger-required functions + /***** Messenger-required functions **********/ void destroy() { - if (!endpoint_stopped) - local_endpoint->destroy(); + if (dispatch_thread.is_started()) + dispatch_thread.join(); Messenger::destroy(); } - entity_addr_t get_myaddr() { - if (!endpoint_stopped) - return local_endpoint->get_myaddr(); - return entity_addr_t(); - } + entity_addr_t get_myaddr(); int get_dispatch_queue_len() { return dispatch_queue.get_queue_len(); } - void ready() { - if (!endpoint_stopped) - local_endpoint->ready(); - } - - int shutdown() { - if (!endpoint_stopped) - return local_endpoint->shutdown(); - return -1; - } - - void suicide() { - if (!endpoint_stopped) - local_endpoint->suicide(); - } + void ready(); + int shutdown(); + void suicide(); + void prepare_dest(const entity_inst_t& inst); + int send_message(Message *m, entity_inst_t dest); + int forward_message(Message *m, entity_inst_t dest); + int lazy_send_message(Message *m, entity_inst_t dest); + /***********************/ - void prepare_dest(const entity_inst_t& inst) { - if (!endpoint_stopped) - local_endpoint->prepare_dest(inst); - } +private: + class DispatchThread : public Thread { + SimpleMessenger *rank; + public: + DispatchThread(SimpleMessenger *_rank) : rank(_rank) {} + void *entry() { + rank->dispatch_entry(); + return 0; + } + } dispatch_thread; - int send_message(Message *m, entity_inst_t dest) { - if (!endpoint_stopped) - return local_endpoint->send_message(m, dest); - return -1; - } + void dispatch_entry(); - int forward_message(Message *m, entity_inst_t dest) { - if (!endpoint_stopped) - return local_endpoint->forward_message(m, dest); - return -1; - } + SimpleMessenger *rank; //hack to make dout macro work, will fix - int lazy_send_message(Message *m, entity_inst_t dest) { - if (!endpoint_stopped) - return local_endpoint->lazy_send_message(m, dest); - return -1; - } - public: SimpleMessenger() : Messenger(entity_name_t()), accepter(this), lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true), local_endpoint(NULL), endpoint_stopped(true), my_type(-1), - global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) { + global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0), + dispatch_thread(this), rank(this) { // for local dmsg delivery dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN); } -- 2.39.5