#undef dout_prefix
#define dout_prefix *_dout << "incomingqueue(" << this << " " << parent << ")."
-void IncomingQueue::queue(Message *m, int priority)
+void IncomingQueue::queue(Message *m, int priority, bool hold_dq_lock)
{
Mutex::Locker l(lock);
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
if (in_q.count(priority) == 0) {
// queue inq AND message under inq AND dispatch_queue locks.
- lock.Unlock();
- dq->lock.Lock();
- lock.Lock();
+ if (!hold_dq_lock) {
+ lock.Unlock();
+ dq->lock.Lock();
+ lock.Lock();
+ } else {
+ assert(dq->lock.is_locked());
+ }
if (halt) {
- dq->lock.Unlock();
+ if (!hold_dq_lock) {
+ dq->lock.Unlock();
+ } else {
+ assert(dq->lock.is_locked());
+ }
goto halt;
}
queue.push_back(m);
- dq->lock.Unlock();
+ if (!hold_dq_lock) {
+ dq->lock.Unlock();
+ } else {
+ assert(dq->lock.is_locked());
+ }
} else {
ldout(cct,20) << "queue " << m << " under existing queue" << dendl;
// just queue message under our lock.
}
+
/*******************
* DispatchQueue
*/
void DispatchQueue::local_delivery(Message *m, int priority)
{
- if ((unsigned long)m > 10)
- m->set_connection(msgr->local_connection->get());
+ m->set_connection(msgr->local_connection->get());
local_queue.queue(m, priority);
}
<< ", moved to end of list" << dendl;
qlist->push_back(inq->queue_items[priority]); // move to end of list
}
- lock.Unlock(); //done with the pipe queue for a while
+
+ Connection *con = NULL;
+ if ((long)m < DispatchQueue::D_NUM_CODES) {
+ assert(inq == &local_queue);
+ con = con_q.front();
+ con_q.pop_front();
+ }
+
+ lock.Unlock();
inq->in_qlen--;
qlen.dec();
- inq->lock.Unlock(); // done with the pipe's message queue now
-
+ inq->lock.Unlock();
if (dequeued)
inq->put();
- {
- if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
- lock.Lock();
- Connection *con = remote_reset_q.front();
- remote_reset_q.pop_front();
- lock.Unlock();
- msgr->ms_deliver_handle_remote_reset(con);
- con->put();
- } else if ((long)m == DispatchQueue::D_CONNECT) {
- lock.Lock();
- Connection *con = connect_q.front();
- connect_q.pop_front();
- lock.Unlock();
- msgr->ms_deliver_handle_connect(con);
- con->put();
- } else if ((long)m == DispatchQueue::D_ACCEPT) {
- lock.Lock();
- Connection *con = accept_q.front();
- accept_q.pop_front();
- lock.Unlock();
- msgr->ms_deliver_handle_accept(con);
- con->put();
- } else if ((long)m == DispatchQueue::D_BAD_RESET) {
- lock.Lock();
- Connection *con = reset_q.front();
- reset_q.pop_front();
- lock.Unlock();
- msgr->ms_deliver_handle_reset(con);
- con->put();
- } else {
- uint64_t msize = m->get_dispatch_throttle_size();
- m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
-
- ldout(cct,1) << "<== " << m->get_source_inst()
- << " " << m->get_seq()
- << " ==== " << *m
- << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
- << "+" << m->get_data().length()
- << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
- << " " << m->get_footer().data_crc << ")"
- << " " << m << " con " << m->get_connection()
- << dendl;
- msgr->ms_deliver_dispatch(m);
-
- msgr->dispatch_throttle_release(msize);
-
- ldout(cct,20) << "done calling dispatch on " << m << dendl;
- }
+ if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
+ msgr->ms_deliver_handle_remote_reset(con);
+ con->put();
+ } else if ((long)m == DispatchQueue::D_CONNECT) {
+ msgr->ms_deliver_handle_connect(con);
+ con->put();
+ } else if ((long)m == DispatchQueue::D_ACCEPT) {
+ msgr->ms_deliver_handle_accept(con);
+ con->put();
+ } else if ((long)m == DispatchQueue::D_BAD_RESET) {
+ msgr->ms_deliver_handle_reset(con);
+ con->put();
+ } else {
+ uint64_t msize = m->get_dispatch_throttle_size();
+ m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
+
+ ldout(cct,1) << "<== " << m->get_source_inst()
+ << " " << m->get_seq()
+ << " ==== " << *m
+ << " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
+ << "+" << m->get_data().length()
+ << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
+ << " " << m->get_footer().data_crc << ")"
+ << " " << m << " con " << m->get_connection()
+ << dendl;
+ msgr->ms_deliver_dispatch(m);
+
+ msgr->dispatch_throttle_release(msize);
+
+ ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
+
lock.Lock();
}
if (!stop)
map<int, xlist<IncomingQueue *>::item* > queue_items; // protected by pipe_lock AND q.lock
bool halt;
- void queue(Message *m, int priority);
+ void queue(Message *m, int priority, bool hold_dq_lock=false);
void discard_queue();
void restart_queue();
atomic_t qlen;
enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
- list<Connection*> connect_q, accept_q;
- list<Connection*> remote_reset_q;
- list<Connection*> reset_q;
+ list<Connection*> con_q;
IncomingQueue local_queue;
lock.Unlock();
return;
}
- connect_q.push_back(con->get());
+ con_q.push_back(con->get());
+ local_queue.queue((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
- local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
}
void queue_accept(Connection *con) {
lock.Lock();
lock.Unlock();
return;
}
- accept_q.push_back(con->get());
+ con_q.push_back(con->get());
+ local_queue.queue((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
- local_delivery((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST);
}
void queue_remote_reset(Connection *con) {
lock.Lock();
lock.Unlock();
return;
}
- remote_reset_q.push_back(con->get());
+ con_q.push_back(con->get());
+ local_queue.queue((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
- local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
}
void queue_reset(Connection *con) {
lock.Lock();
lock.Unlock();
return;
}
- reset_q.push_back(con->get());
+ con_q.push_back(con->get());
+ local_queue.queue((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
- local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
}
void start();