* end of the queue. If the queue is empty; it's removed.
* The message is then delivered and the process starts again.
*/
-void SimpleMessenger::Endpoint::dispatch_entry()
+void SimpleMessenger::dispatch_entry()
{
- DispatchQueue& q = rank->dispatch_queue;
- q.lock.Lock();
- while (!q.stop) {
- while (!q.queued_pipes.empty() && !q.stop) {
+ dispatch_queue.lock.Lock();
+ while (!dispatch_queue.stop) {
+ while (!dispatch_queue.queued_pipes.empty() && !dispatch_queue.stop) {
//get highest-priority pipe
- map<int, xlist<Pipe *> >::reverse_iterator high_iter = q.queued_pipes.rbegin();
+ map<int, xlist<Pipe *> >::reverse_iterator high_iter =
+ dispatch_queue.queued_pipes.rbegin();
int priority = high_iter->first;
xlist<Pipe *>& pipe_list = high_iter->second;
if (m_queue.empty()) {
pipe_list.pop_front(); // pipe is done
if (pipe_list.empty())
- q.queued_pipes.erase(priority);
+ dispatch_queue.queued_pipes.erase(priority);
} else {
pipe_list.push_back(pipe->queue_items[priority]); // move to end of list
}
- q.lock.Unlock(); //done with the pipe queue for a while
+ dispatch_queue.lock.Unlock(); //done with the pipe queue for a while
pipe->in_qlen--;
- q.qlen_lock.lock();
- q.qlen--;
- q.qlen_lock.unlock();
+ dispatch_queue.qlen_lock.lock();
+ dispatch_queue.qlen--;
+ dispatch_queue.qlen_lock.unlock();
pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
{
if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
- q.lock.Lock();
- Connection *con = q.remote_reset_q.front();
- q.remote_reset_q.pop_front();
- q.lock.Unlock();
+ dispatch_queue.lock.Lock();
+ Connection *con = dispatch_queue.remote_reset_q.front();
+ dispatch_queue.remote_reset_q.pop_front();
+ dispatch_queue.lock.Unlock();
ms_deliver_handle_remote_reset(con);
con->put();
} else if ((long)m == DispatchQueue::D_CONNECT) {
- q.lock.Lock();
- Connection *con = q.connect_q.front();
- q.connect_q.pop_front();
- q.lock.Unlock();
+ dispatch_queue.lock.Lock();
+ Connection *con = dispatch_queue.connect_q.front();
+ dispatch_queue.connect_q.pop_front();
+ dispatch_queue.lock.Unlock();
ms_deliver_handle_connect(con);
con->put();
} else if ((long)m == DispatchQueue::D_BAD_RESET) {
- q.lock.Lock();
- Connection *con = q.reset_q.front();
- q.reset_q.pop_front();
- q.lock.Unlock();
+ dispatch_queue.lock.Lock();
+ Connection *con = dispatch_queue.reset_q.front();
+ dispatch_queue.reset_q.pop_front();
+ dispatch_queue.lock.Unlock();
ms_deliver_handle_reset(con);
con->put();
} else {
dout(20) << "done calling dispatch on " << m << dendl;
}
}
- q.lock.Lock();
+ dispatch_queue.lock.Lock();
}
- if (!q.stop)
- q.cond.Wait(q.lock); //wait for something to get put on queue
+ if (!dispatch_queue.stop)
+ dispatch_queue.cond.Wait(dispatch_queue.lock); //wait for something to be put on queue
}
- q.lock.Unlock();
+ dispatch_queue.lock.Unlock();
dout(15) << "dispatch: ending loop " << dendl;
- // deregister
- rank->unregister_entity(this);
- put();
+ put(); //this thread is shutting down, so one less reference
}
-void SimpleMessenger::Endpoint::ready()
+void SimpleMessenger::ready()
{
dout(10) << "ready " << get_myaddr() << dendl;
assert(!dispatch_thread.is_started());
}
-int SimpleMessenger::Endpoint::shutdown()
+int SimpleMessenger::shutdown()
{
dout(10) << "shutdown " << get_myaddr() << dendl;
- DispatchQueue& q = rank->dispatch_queue;
// stop my dispatch thread
if (dispatch_thread.am_self()) {
dout(10) << "shutdown i am dispatch, setting stop flag" << dendl;
- q.stop = true;
+ dispatch_queue.stop = true;
} else {
dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
- q.lock.Lock();
- q.stop = true;
- q.cond.Signal();
- q.lock.Unlock();
+ dispatch_queue.lock.Lock();
+ dispatch_queue.stop = true;
+ dispatch_queue.cond.Signal();
+ dispatch_queue.lock.Unlock();
}
return 0;
}
-void SimpleMessenger::Endpoint::suicide()
+void SimpleMessenger::suicide()
{
dout(10) << "suicide " << get_myaddr() << dendl;
shutdown();
// hmm, or exit(0)?
}
-void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst)
+void SimpleMessenger::prepare_dest(const entity_inst_t& inst)
{
- rank->lock.Lock();
+ lock.Lock();
{
- if (rank->rank_pipe.count(inst.addr) == 0)
- rank->connect_rank(inst.addr, inst.name.type());
+ if (rank_pipe.count(inst.addr) == 0)
+ connect_rank(inst.addr, inst.name.type());
}
- rank->lock.Unlock();
+ lock.Unlock();
}
-int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::send_message(Message *m, entity_inst_t dest)
{
// set envelope
m->get_header().src = get_myinst();
<< " " << m
<< dendl;
- rank->submit_message(m, dest);
+ submit_message(m, dest);
return 0;
}
-int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::forward_message(Message *m, entity_inst_t dest)
{
// set envelope
m->get_header().src = get_myinst();
<< " " << m
<< dendl;
- rank->submit_message(m, dest);
+ submit_message(m, dest);
return 0;
}
-int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::lazy_send_message(Message *m, entity_inst_t dest)
{
// set envelope
m->get_header().src = get_myinst();
<< " " << m
<< dendl;
- rank->submit_message(m, dest, true);
-
- return 0;
-}
+ submit_message(m, dest, true);
-int SimpleMessenger::Endpoint::send_keepalive(entity_inst_t dest)
-{
- rank->send_keepalive(dest);
return 0;
}
-
-
-void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
-{
- rank->mark_down(a);
-}
-
-
-entity_addr_t SimpleMessenger::Endpoint::get_myaddr()
+entity_addr_t SimpleMessenger::get_myaddr()
{
entity_addr_t a = rank->rank_addr;
a.erank = 0;
* cleaner to merge the class with SimpleMessenger itself.
* 3) Pipe. Each network connection is handled through a pipe, which handles
* the input and output of each message.
+ *
+ * This class should only be created on the heap, and it should be destroyed
+ * via a call to destroy(). Making it on the stack or otherwise calling
+ * the destructor will lead to badness.
*/
/* Rank - per-process
int state;
protected:
- friend class Endpoint;
+ friend class SimpleMessenger;
Connection *connection_state;
utime_t backoff; // backoff time
}
}
} dispatch_queue;
-
- // messenger interface
class Endpoint : public Messenger {
SimpleMessenger *rank;
-
- class DispatchThread : public Thread {
- Endpoint *m;
- public:
- DispatchThread(Endpoint *_m) : m(_m) {}
- void *entry() {
- m->dispatch_entry();
- return 0;
- }
- } dispatch_thread;
- void dispatch_entry();
-
friend class SimpleMessenger;
-
public:
- Endpoint(SimpleMessenger *r, entity_name_t name) :
+ Endpoint(SimpleMessenger *r, entity_name_t name) :
Messenger(name),
- rank(r),
- dispatch_thread(this) {
- }
- ~Endpoint() {
- }
+ rank(r) {}
+ ~Endpoint() {}
void destroy() {
- // join dispatch thread
- if (dispatch_thread.is_started())
- dispatch_thread.join();
-
+ rank->destroy();
Messenger::destroy();
}
- void ready();
- //bool is_stopped() { return stop; }
+ void ready() { rank->ready(); }
- int get_dispatch_queue_len() { return rank->dispatch_queue.get_queue_len(); }
+ int get_dispatch_queue_len() { return rank->get_dispatch_queue_len(); }
- entity_addr_t get_myaddr();
+ entity_addr_t get_myaddr() { return rank->get_myaddr(); }
- int shutdown();
- void suicide();
- void prepare_dest(const entity_inst_t& inst);
- int send_message(Message *m, entity_inst_t dest);
- int forward_message(Message *m, entity_inst_t dest);
- int lazy_send_message(Message *m, entity_inst_t dest);
- int send_keepalive(entity_inst_t dest);
-
- void mark_down(entity_addr_t a);
- void mark_up(entity_name_t a, entity_addr_t& i);
+ int shutdown() { return rank->shutdown(); }
+ void suicide() { rank->suicide(); }
+ void prepare_dest(const entity_inst_t& inst) { rank->prepare_dest(inst); }
+ int send_message(Message *m, entity_inst_t dest) {
+ return rank->send_message(m, dest); }
+ int forward_message(Message *m, entity_inst_t dest) {
+ return rank->forward_message(m, dest); }
+ int lazy_send_message(Message *m, entity_inst_t dest) {
+ return rank->lazy_send_message(m, dest); }
+ int send_keepalive(entity_inst_t dest) { return rank->send_keepalive(dest);}
};
-
-
+
// SimpleMessenger stuff
public:
Mutex lock;
return default_policy;
}
- //Messenger-required functions
+ /***** Messenger-required functions **********/
void destroy() {
- if (!endpoint_stopped)
- local_endpoint->destroy();
+ if (dispatch_thread.is_started())
+ dispatch_thread.join();
Messenger::destroy();
}
- entity_addr_t get_myaddr() {
- if (!endpoint_stopped)
- return local_endpoint->get_myaddr();
- return entity_addr_t();
- }
+ entity_addr_t get_myaddr();
int get_dispatch_queue_len() {
return dispatch_queue.get_queue_len();
}
- void ready() {
- if (!endpoint_stopped)
- local_endpoint->ready();
- }
-
- int shutdown() {
- if (!endpoint_stopped)
- return local_endpoint->shutdown();
- return -1;
- }
-
- void suicide() {
- if (!endpoint_stopped)
- local_endpoint->suicide();
- }
+ void ready();
+ int shutdown();
+ void suicide();
+ void prepare_dest(const entity_inst_t& inst);
+ int send_message(Message *m, entity_inst_t dest);
+ int forward_message(Message *m, entity_inst_t dest);
+ int lazy_send_message(Message *m, entity_inst_t dest);
+ /***********************/
- void prepare_dest(const entity_inst_t& inst) {
- if (!endpoint_stopped)
- local_endpoint->prepare_dest(inst);
- }
+private:
+ class DispatchThread : public Thread {
+ SimpleMessenger *rank;
+ public:
+ DispatchThread(SimpleMessenger *_rank) : rank(_rank) {}
+ void *entry() {
+ rank->dispatch_entry();
+ return 0;
+ }
+ } dispatch_thread;
- int send_message(Message *m, entity_inst_t dest) {
- if (!endpoint_stopped)
- return local_endpoint->send_message(m, dest);
- return -1;
- }
+ void dispatch_entry();
- int forward_message(Message *m, entity_inst_t dest) {
- if (!endpoint_stopped)
- return local_endpoint->forward_message(m, dest);
- return -1;
- }
+ SimpleMessenger *rank; //hack to make dout macro work, will fix
- int lazy_send_message(Message *m, entity_inst_t dest) {
- if (!endpoint_stopped)
- return local_endpoint->lazy_send_message(m, dest);
- return -1;
- }
-
public:
SimpleMessenger() :
Messenger(entity_name_t()),
accepter(this),
lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true),
local_endpoint(NULL), endpoint_stopped(true), my_type(-1),
- global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) {
+ global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
+ dispatch_thread(this), rank(this) {
// for local dmsg delivery
dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
}