* end of the queue. If the queue is empty; it's removed.
* The message is then delivered and the process starts again.
*/
-void SimpleMessenger::dispatch_entry()
+void SimpleMessenger::DispatchQueue::entry()
{
- dispatch_queue.lock.Lock();
- while (!dispatch_queue.stop) {
- while (!dispatch_queue.queued_pipes.empty() && !dispatch_queue.stop) {
+ lock.Lock();
+ while (!stop) {
+ while (!queued_pipes.empty() && !stop) {
//get highest-priority pipe
map<int, xlist<IncomingQueue *>* >::reverse_iterator high_iter =
- dispatch_queue.queued_pipes.rbegin();
+ queued_pipes.rbegin();
int priority = high_iter->first;
xlist<IncomingQueue *> *qlist = high_iter->second;
qlist->pop_front(); // pipe is done
if (qlist->empty()) {
delete qlist;
- dispatch_queue.queued_pipes.erase(priority);
+ queued_pipes.erase(priority);
}
inq->in_q.erase(priority);
ldout(cct,20) << "dispatch_entry inq " << inq << " pipe " << inq->pipe << " dequeued " << m
<< ", moved to end of list" << dendl;
qlist->push_back(inq->queue_items[priority]); // move to end of list
}
- dispatch_queue.lock.Unlock(); //done with the pipe queue for a while
+ lock.Unlock(); //done with the pipe queue for a while
inq->in_qlen--;
- dispatch_queue.qlen.dec();
+ qlen.dec();
inq->lock.Unlock(); // done with the pipe's message queue now
{
if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
- dispatch_queue.lock.Lock();
- Connection *con = dispatch_queue.remote_reset_q.front();
- dispatch_queue.remote_reset_q.pop_front();
- dispatch_queue.lock.Unlock();
- ms_deliver_handle_remote_reset(con);
+ lock.Lock();
+ Connection *con = remote_reset_q.front();
+ remote_reset_q.pop_front();
+ lock.Unlock();
+ msgr->ms_deliver_handle_remote_reset(con);
con->put();
} else if ((long)m == DispatchQueue::D_CONNECT) {
- dispatch_queue.lock.Lock();
- Connection *con = dispatch_queue.connect_q.front();
- dispatch_queue.connect_q.pop_front();
- dispatch_queue.lock.Unlock();
- ms_deliver_handle_connect(con);
+ lock.Lock();
+ Connection *con = connect_q.front();
+ connect_q.pop_front();
+ lock.Unlock();
+ msgr->ms_deliver_handle_connect(con);
con->put();
} else if ((long)m == DispatchQueue::D_BAD_RESET) {
- dispatch_queue.lock.Lock();
- Connection *con = dispatch_queue.reset_q.front();
- dispatch_queue.reset_q.pop_front();
- dispatch_queue.lock.Unlock();
- ms_deliver_handle_reset(con);
+ lock.Lock();
+ Connection *con = reset_q.front();
+ reset_q.pop_front();
+ lock.Unlock();
+ msgr->ms_deliver_handle_reset(con);
con->put();
} else {
uint64_t msize = m->get_dispatch_throttle_size();
<< " " << m->get_footer().data_crc << ")"
<< " " << m << " con " << m->get_connection()
<< dendl;
- ms_deliver_dispatch(m);
+ msgr->ms_deliver_dispatch(m);
- dispatch_throttle_release(msize);
+ msgr->dispatch_throttle_release(msize);
ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
}
- dispatch_queue.lock.Lock();
+ lock.Lock();
}
- if (!dispatch_queue.stop)
- dispatch_queue.cond.Wait(dispatch_queue.lock); //wait for something to be put on queue
+ if (!stop)
+ cond.Wait(lock); //wait for something to be put on queue
}
- dispatch_queue.lock.Unlock();
+ lock.Unlock();
+}
+
+void SimpleMessenger::dispatch_entry()
+{
+ dispatch_queue.entry();
//tell everything else it's time to stop
lock.Lock();
string mname, uint64_t _nonce) :
Messenger(cct, name),
accepter(this),
+ dispatch_queue(cct, this),
reaper_thread(this),
dispatch_thread(this),
my_type(name.type()),
* See SimpleMessenger::dispatch_entry for details.
*/
struct DispatchQueue {
+ CephContext *cct;
+ SimpleMessenger *msgr;
Mutex lock;
Cond cond;
bool stop;
local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
}
- DispatchQueue() :
- lock("SimpleMessenger::DispatchQeueu::lock"),
- stop(false),
- qlen(0),
- local_pipe(NULL)
+ void entry();
+
+ DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
+ : cct(cct), msgr(msgr),
+ lock("SimpleMessenger::DispatchQeueu::lock"),
+ stop(false),
+ qlen(0),
+ local_pipe(NULL)
{}
~DispatchQueue() {
for (map< int, xlist<IncomingQueue *>* >::iterator i = queued_pipes.begin();