void SimpleMessenger::Endpoint::dispatch_entry()
{
dout(0) << "entered SimpleMessenger::Endpoint::dispatch_entry" << dendl;
- map<int, xlist<Pipe *> >::reverse_iterator high_iter;
endpoint_lock.Lock();
while (!stop) {
dout(0) << "in outer !stop loop of SimpleMessenger::Endpoint::dispatch_entry" << dendl;
while (!queued_pipes.empty()) {
dout(0) << "SimpleMessenger::Endpoint::dispatch_entry delivering a message qlen " << qlen << dendl;
//get highest-priority pipe
- high_iter = queued_pipes.rbegin();
+ map<int, xlist<Pipe *> >::reverse_iterator high_iter =
+ queued_pipes.rbegin();
int priority = high_iter->first;
xlist<Pipe *>& pipe_list = high_iter->second;
+
Pipe *pipe = pipe_list.front();
dout(0) << "high priority: " << priority << " taking pipe " << pipe << dendl;
//move pipe to back of line -- or just take off if no more messages
}
endpoint_lock.Lock();
}
- cond.Wait(endpoint_lock); //wait for something to get put on queue
+ if (!stop)
+ cond.Wait(endpoint_lock); //wait for something to get put on queue
}
endpoint_lock.Unlock();
dout(15) << "dispatch: ending loop " << dendl;
void queue_connect(Connection *con) {
endpoint_lock.Lock();
connect_q.push_back(con);
- local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
- cond.Signal();
endpoint_lock.Unlock();
+ local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
}
void queue_remote_reset(Connection *con) {
endpoint_lock.Lock();
remote_reset_q.push_back(con);
- local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
- cond.Signal();
endpoint_lock.Unlock();
+ local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
}
void queue_reset(Connection *con) {
endpoint_lock.Lock();
+ endpoint_lock.Unlock();
reset_q.push_back(con);
local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
- cond.Signal();
- endpoint_lock.Unlock();
}
public: